Skip to main content

Census (dagster-census)

This library provides an integration with Census.

class dagster_census.CensusComponent [source]

Loads Census syncs from a Census workspace as Dagster assets. Materializing these assets will trigger the Census sync, enabling you to schedule Census syncs using Dagster.

Example:

# defs.yaml

type: dagster_census.CensusComponent
attributes:
workspace:
api_key: "{{ env.CENSUS_API_KEY }}"
sync_selector:
by_name:
- my_first_sync
- my_second_sync
execute [source]

Executes a Census sync for the selected sync.

This method can be overridden in a subclass to customize the sync execution behavior, such as adding custom logging or handling sync results differently.

Parameters:

  • context – The asset execution context provided by Dagster
  • census – The CensusResource used to trigger and monitor syncs

Returns: MaterializeResult event from the Census sync

Example:

Override this method to add custom logging during sync execution:

from dagster_census import CensusComponent
import dagster as dg

class CustomCensusComponent(CensusComponent):
def execute(self, context, census):
context.log.info(f"Starting Census sync for {context.asset_key}")
result = super().execute(context, census)
context.log.info("Census sync completed successfully")
return result
get_asset_spec [source]

To use the Census component, see the Census component integration guide.

dagster_census.census_trigger_sync_op OpDefinition [source]

Executes a Census sync for a given sync_id and polls until that sync completes, raising an error if it is unsuccessful.

It outputs a CensusOutput which contains the details of the Census sync after it successfully completes.

It requires the use of the census_resource, which allows it to communicate with the Census API.

Examples:
from dagster import job
from dagster_census import census_resource, census_sync_op

my_census_resource = census_resource.configured(
{
"api_key": {"env": "CENSUS_API_KEY"},
}
)

sync_foobar = census_sync_op.configured({"sync_id": "foobar"}, name="sync_foobar")

@job(resource_defs={"census": my_census_resource})
def my_simple_census_job():
sync_foobar()
class dagster_census.CensusResource [source]

This resource allows users to programatically interface with the Census REST API to launch syncs and monitor their progress. This currently implements only a subset of the functionality exposed by the API.

Examples:
import dagster as dg
from dagster_census import CensusResource

census_resource = CensusResource(
api_key=dg.EnvVar("CENSUS_API_KEY")
)

@dg.asset
def census_sync_asset(census: CensusResource):
census.trigger_sync_and_poll(sync_id=123456)

defs = dg.Definitions(
assets=[census_sync_asset],
resources={"census": census_resource}
)
class dagster_census.CensusOutput [source]

Contains recorded information about the state of a Census sync after a sync completes.

Parameters:

  • sync_run (Dict[str, Any]) – The details of the specific sync run.
  • source (Dict[str, Any]) – Information about the source for the Census sync.
  • destination (Dict[str, Any]) – Information about the destination for the Census sync.