Skip to content

Full Files

The code snippets in this section are taken from extracts/measurement_extract.py and etl_steps/parse_measurements.py. The full files are shown below:

alice_bob_model/extracts/measurement_extract.py
from os import listdir
from os.path import join
from typing import List

from dbgen import Extract


class MeasurementExtract(Extract):
    data_dir: str
    outputs: List[str] = ["filename", "contents"]

    def extract(self):
        fnames = listdir(self.data_dir)
        for fname in fnames:
            with open(join(self.data_dir, fname)) as f:
                contents = f.read()
            yield fname, contents
alice_bob_model/etl_steps/parse_measurements.py
import re
from os.path import join
from typing import Tuple

from alice_bob_model.constants import DATA_DIR, DEFAULT_ENV
from alice_bob_model.extracts.measurement_extract import MeasurementExtract
from alice_bob_model.schema import Person, TemperatureMeasurement

from dbgen import Environment, ETLStep, Import, Model, transform


@transform(
    outputs=["first_name", "last_name", "ordering", "temperature"],
    env=DEFAULT_ENV + Environment(Import("re")),
)
def parse_measurements(file_name: str, contents: str) -> Tuple[str, str, int, float]:
    regex = r"([A-Za-z]+)_([A-Za-z]+)_(\d+).txt"
    match = re.match(regex, file_name)
    if not match:
        raise ValueError("No match found")
    first_name, last_name, ordering_str = match.groups()
    ordering = int(ordering_str)

    regex = r".*:\s*(\d+)"
    match = re.match(regex, contents)
    if not match:
        raise ValueError("No match found")
    (temperature_str,) = match.groups()
    temperature = float(temperature_str)

    return first_name, last_name, ordering, temperature


def add_temperature_etl_step(model: Model) -> None:
    with model:
        with ETLStep(name="temperature"):
            filename, contents = MeasurementExtract(data_dir=join(DATA_DIR, "measurements")).results()
            first_name, last_name, ordering, temperature = parse_measurements(filename, contents).results()
            TemperatureMeasurement.load(
                insert=True,
                temperature_F=temperature,
                ordering=ordering,
                person_id=Person.load(first_name=first_name, last_name=last_name),
            )

Handling Data with Custom Formats

Oftentimes, scientific data is stored in custom file formats, and we need to be able to ingest this into the database. In this example, we assume there is a folder of temperature measurements on the local file system. The file names are of the format FirstName_LastName_MeasurementNumber.txt, and each file contains text similar to the following:

T (F): 60

Writing the Custom Extract

Defining the Attributes

First, we subclass the Extract class (imported from dbgen) and define the necessary inputs.

from os import listdir
from os.path import join
from typing import List

from dbgen import Extract


class MeasurementExtract(Extract):
    data_dir: str
    outputs: List[str] = ["filename", "contents"]

    def extract(self):
        fnames = listdir(self.data_dir)
        for fname in fnames:
            with open(join(self.data_dir, fname)) as f:
                contents = f.read()
            yield fname, contents

The data_dir is a string that specifies the name of the folder in which our text files are stored.

As always, we need to define the output names. In this case, since we need information from both the filename and the file contents, we will have two outputs.

Defining the extract method

Finally, we need to overwrite the extract method. As always, the extract method must be a generator. In this case, we would like to output two strings: the file name and the file contents.

We get a list of the filenames in the data_dir then loop over the filenames and simply read the file and yield the name of the file and the contents of the file.

from os import listdir
from os.path import join
from typing import List

from dbgen import Extract


class MeasurementExtract(Extract):
    data_dir: str
    outputs: List[str] = ["filename", "contents"]

    def extract(self):
        fnames = listdir(self.data_dir)
        for fname in fnames:
            with open(join(self.data_dir, fname)) as f:
                contents = f.read()
            yield fname, contents

These strings will be parsed in the transform step. In general, if there is something that can be done in either the extract step or the transform step, it is better to do it in the transform step because the exception handling for transforms is more advanced. Extracts should remain as simple as possible.

Defining the corresponding transform

The first step to defining a transform is to define the output names and the python environment needed to run the function. In this case, we do not need any non-built-in python packages, so we can use our default python environment (DEFAULT_ENV).

We want our function to return four items: the researchers first name and last name, the order in which the measurement was taken, and the actual temperature measurement.

import re
from os.path import join
from typing import Tuple

from alice_bob_model.constants import DATA_DIR, DEFAULT_ENV
from alice_bob_model.extracts.measurement_extract import MeasurementExtract
from alice_bob_model.schema import Person, TemperatureMeasurement

from dbgen import Environment, ETLStep, Import, Model, transform

Next, we need to write a custom function that parses the filename and file contents to extract the information that we are interested in.

import re
from os.path import join
from typing import Tuple

from alice_bob_model.constants import DATA_DIR, DEFAULT_ENV
from alice_bob_model.extracts.measurement_extract import MeasurementExtract
from alice_bob_model.schema import Person, TemperatureMeasurement

from dbgen import Environment, ETLStep, Import, Model, transform


@transform(
    outputs=["first_name", "last_name", "ordering", "temperature"],
    env=DEFAULT_ENV + Environment(Import("re")),
)
def parse_measurements(file_name: str, contents: str) -> Tuple[str, str, int, float]:
    regex = r"([A-Za-z]+)_([A-Za-z]+)_(\d+).txt"
    match = re.match(regex, file_name)
    if not match:
        raise ValueError("No match found")
    first_name, last_name, ordering_str = match.groups()
    ordering = int(ordering_str)

    regex = r".*:\s*(\d+)"
    match = re.match(regex, contents)
    if not match:
        raise ValueError("No match found")
    (temperature_str,) = match.groups()
    temperature = float(temperature_str)

    return first_name, last_name, ordering, temperature

Inserting the values into the database

Next, in order to insert these values into the database, we need to define a dbgen ETLStep. The standard pattern is to define a function that accepts the model and adds the new ETLStep to the model. By using with model, all ETLSteps defined in that with block will automatically be added to the model. Similarly, by using with ETLStep(...), all extracts, transforms, and loads instantiated in that with block will automatically be added to the new ETLStep. Lines similar to the ones highlighted below are used almost every time a new ETLStep is defined.

def add_temperature_etl_step(model: Model) -> None:
    with model:
        with ETLStep(name="temperature"):
            filename, contents = MeasurementExtract(data_dir=join(DATA_DIR, "measurements")).results()
            first_name, last_name, ordering, temperature = parse_measurements(filename, contents).results()
            TemperatureMeasurement.load(
                insert=True,
                temperature_F=temperature,
                ordering=ordering,
                person_id=Person.load(first_name=first_name, last_name=last_name),
            )

Next, we need to instantiate the custom extract we defined above. By calling .results() on the instance of our custom extract class, a tuple of the outputs is returned.

def add_temperature_etl_step(model: Model) -> None:
    with model:
        with ETLStep(name="temperature"):
            filename, contents = MeasurementExtract(data_dir=join(DATA_DIR, "measurements")).results()
            first_name, last_name, ordering, temperature = parse_measurements(filename, contents).results()
            TemperatureMeasurement.load(
                insert=True,
                temperature_F=temperature,
                ordering=ordering,
                person_id=Person.load(first_name=first_name, last_name=last_name),
            )

After that, we want to pass the results from this extract to our custom transform (the parser defined above). Similarly, by calling .results() on the transform, a tuple of the outputs is returned.

def add_temperature_etl_step(model: Model) -> None:
    with model:
        with ETLStep(name="temperature"):
            filename, contents = MeasurementExtract(data_dir=join(DATA_DIR, "measurements")).results()
            first_name, last_name, ordering, temperature = parse_measurements(filename, contents).results()
            TemperatureMeasurement.load(
                insert=True,
                temperature_F=temperature,
                ordering=ordering,
                person_id=Person.load(first_name=first_name, last_name=last_name),
            )

Finally, we call .load(...) on the table that we would like to insert data into, and we pass the values output by the transform as keyword arguments to the .load method. An important point is that any call to .load returns the ID of the row specified in the .load(...) statement. We do not always need to use this information, but we do need it to populate foreign keys. Simply put, foreign keys are always populated by calling the .load method on the table that you would like to create a foreign key to, as shown in the last line below.

def add_temperature_etl_step(model: Model) -> None:
    with model:
        with ETLStep(name="temperature"):
            filename, contents = MeasurementExtract(data_dir=join(DATA_DIR, "measurements")).results()
            first_name, last_name, ordering, temperature = parse_measurements(filename, contents).results()
            TemperatureMeasurement.load(
                insert=True,
                temperature_F=temperature,
                ordering=ordering,
                person_id=Person.load(first_name=first_name, last_name=last_name),
            )

Running the Model

We can run the model again to see the effects of our new ETL step. To run the model, enter the command:

dbgen run

To see information about the attempted run of the model, enter the command dbgen run status. In this case, we should see that 30 rows have been inserted.