Skip to content

Full Files

The code snippets in this section are taken from etl_steps/f_to_c.py. The full file is shown below:

alice_bob_model/etl_steps/f_to_c.py
from alice_bob_model.constants import DEFAULT_ENV
from alice_bob_model.schema import TemperatureMeasurement
from sqlmodel import select

from dbgen import ETLStep, Model, Query, transform


@transform(outputs=["temp_c"], env=DEFAULT_ENV)
def f_to_c(temp_f: float) -> float:
    temp_c = (temp_f - 32.0) * 5.0 / 9.0

    return temp_c


def add_f_to_c(model: Model) -> None:
    with model:
        with ETLStep(name="f_to_c"):
            temperature_measurement_id, temp_f = Query(
                select(TemperatureMeasurement.id, TemperatureMeasurement.temperature_F)
            ).results()
            temp_c = f_to_c(temp_f).results()
            TemperatureMeasurement.load(
                id=temperature_measurement_id,
                temperature_C=temp_c,
            )

Refining Data in the Database

When working with scientific data, it is very common to pass data through a series of data analysis steps. The nature of these analysis steps are highly domain-specific. To avoid any domain-specific jargon, in this tutorial, we walk through a very simple data analysis method: converting Fahrenheit to Celsius.

In practice, the majority of ETLSteps are of this type.

Defining the ETLStep

In order to carry out data analysis like this, we need to define a new "extract, transform, load" (ETL) step, which means we need to define a new DBgen ETLStep.

In this case, instead of using a custom extract class to define the data source, we will use a query against the database. The transform and load steps are similar to what you have already seen.

Setup

As always, we begin by adding a new file to the etl_steps module and define a function that accepts the model as an input then begins with with model: then with ETLStep(name=...):

def add_f_to_c(model: Model) -> None:
    with model:
        with ETLStep(name="f_to_c"):
            ...

Query

Instead of using a custom extract, we will use a Query as the data source because the required input data is already in the database.

DBgen uses sqlalchemy for querying. The query syntax is documented in detail here.

To do this, first, Query must be imported from dbgen, and select must be imported from sqlmodel.

For queries that include only one table (which is the case in this example), the syntax is simply: select(TableName.ColumnName_1, TableName.ColumnName_2).

Similarly to the extracts shown in the previous ETLSteps, by calling .results() on the Query, a tuple of the outputs are returned in the order that they are listed in the select statement.

from alice_bob_model.schema import TemperatureMeasurement
from sqlmodel import select


def add_f_to_c(model: Model) -> None:
    with model:
        with ETLStep(name="f_to_c"):
            temperature_measurement_id, temp_f = Query(
                select(TemperatureMeasurement.id, TemperatureMeasurement.temperature_F)
            ).results()

Imports and Environments

In this example, although it is quite simple to write a function to convert F to C, we will import and use scipy's convert_temperature function in order to show how to use DBgen with python environments that include non-built-in libraries like scipy.

The @transform decorator specifies two things:

  • The output names (a list of strings the same length as the number of the function's outputs)
  • The python environment (a dbgen.Environment object)

To define a dbgen.Environment object, we start by importing Environment and Import from dbgen.

To create an Environment object, we pass a list of Imports to Env(). To create an Import, we pass two arguments to Import():

  • The name of the python module that we would like to import a function or class from (a string)
  • The name of that function or class (a string)

It is also possible to call Import() with just the first argument to import the entire module.

Essentially, whenever a line like...

from library import function

...appears above a function definition, we need to include...

Import("library", "function")

...in the list of imports in the dbgen Environment.

Finally, it is worth noting that dbgen Environment objects can be added together. So, if there is a default Environment that is used for most transforms, and we just need to add one extra import to that, rather than define a new Environment from scratch that includes every package, we can simply create an Environment with just the one new import and add it to the default Environment. An example of doing just that is shown below.

from alice_bob_model.constants import DEFAULT_ENV
from alice_bob_model.schema import TemperatureMeasurement
from sqlmodel import select

from dbgen import ETLStep, Model, Query, transform


@transform(outputs=["temp_c"], env=DEFAULT_ENV)
def f_to_c(temp_f: float) -> float:
    temp_c = (temp_f - 32.0) * 5.0 / 9.0

    return temp_c


def add_f_to_c(model: Model) -> None:
    with model:
        with ETLStep(name="f_to_c"):
            temperature_measurement_id, temp_f = Query(
                select(TemperatureMeasurement.id, TemperatureMeasurement.temperature_F)
            ).results()
            temp_c = f_to_c(temp_f).results()
            TemperatureMeasurement.load(
                id=temperature_measurement_id,
                temperature_C=temp_c,
            )

Transform

Once the environment is defined, the transform step is very similar to the previously-shown transform steps. We define a function that carries out the desired data analysis and add the @transform decorator to specify the output names and the python environment.

from alice_bob_model.constants import DEFAULT_ENV
from alice_bob_model.schema import TemperatureMeasurement
from sqlmodel import select

from dbgen import ETLStep, Model, Query, transform


@transform(outputs=["temp_c"], env=DEFAULT_ENV)
def f_to_c(temp_f: float) -> float:
    temp_c = (temp_f - 32.0) * 5.0 / 9.0

    return temp_c


def add_f_to_c(model: Model) -> None:
    with model:
        with ETLStep(name="f_to_c"):
            temperature_measurement_id, temp_f = Query(
                select(TemperatureMeasurement.id, TemperatureMeasurement.temperature_F)
            ).results()
            temp_c = f_to_c(temp_f).results()
            TemperatureMeasurement.load(
                id=temperature_measurement_id,
                temperature_C=temp_c,
            )

Load

The load step is very similar to what has been shown in the previous ETLSteps. However, in this case, rows are being updated, not added.

As always, TableName.load() is called, and keyword arguments are passed where the keywords are the names of the columns, as specified in schema.py, and the arguments are the names of the corresponding variables defined in the ETLStep by calling .results() on Extracts, Queries, or transforms.

And, as always, when .load() is called, we must supply either:

  • keyword arguments for each of the identifying columns (as specified in schema.py, or
  • the keyword [tablename] and the corresponding id for the row). This id either comes from:
    • querying for the .id column (as shown in this example), or
    • calling Tablename.load(), which always returns the id of the specified row (as shown in the previous ETLStep).
from alice_bob_model.constants import DEFAULT_ENV
from alice_bob_model.schema import TemperatureMeasurement
from sqlmodel import select

from dbgen import ETLStep, Model, Query, transform


@transform(outputs=["temp_c"], env=DEFAULT_ENV)
def f_to_c(temp_f: float) -> float:
    temp_c = (temp_f - 32.0) * 5.0 / 9.0

    return temp_c


def add_f_to_c(model: Model) -> None:
    with model:
        with ETLStep(name="f_to_c"):
            temperature_measurement_id, temp_f = Query(
                select(TemperatureMeasurement.id, TemperatureMeasurement.temperature_F)
            ).results()
            temp_c = f_to_c(temp_f).results()
            TemperatureMeasurement.load(
                id=temperature_measurement_id,
                temperature_C=temp_c,
            )

Running the Model

We can run the model again to see the effects of our new ETL step. To run the model, enter the command:

dbgen run

To see information about the attempted run of the model, enter the command dbgen run status. In this case, we should see that 30 rows have been updated.