Skip to main content

Part 3 - Process data

Let’s learn by example.

In parts 1 & 2, we extracted data from GitHub and loaded it into a (local) PostgreSQL database. Now it is time to have more fun. We decide to load all attributes from the data we selected previously, and then build a model listing the different authors of commits to our repository.

That means, in this part we're going to unleash dbt (data build tool) onto our data to transform it into meaningful information. Don't worry, you don't need to know anything about dbt, this tutorial is self-contained. You do not need to install dbt yourself, it works as a Meltano plugin.

Of course you can choose any other processing tool like:

  • a jupyter notebook
  • or plain Python

to process your data.

If you're having trouble throughout this tutorial, you can always head over to the Slack channel to get help.

Select more source data

To get all the data from the GitHub commits, you can use the meltano select command:

meltano select tap-github commits "*"

This will add the following line to your project file:

      extractors:
- name: tap-github
[...]
select:
- commits.url # < technically not necessary anymore, but no need to delete
- commits.sha # < technically not necessary anymore, but no need to delete
- commits.commit_timestamp # < technically not necessary anymore, but no need to delete
- commits.* # < new data.

To refresh your database tables after the configuration changes you can run meltano run --full-refresh tap-github target-postgres:

$ meltano run --full-refresh tap-github target-postgres
2022-09-22T07:36:52.985090Z [info ] Environment 'dev' is active
{"type": "STATE", "value": [...]}
INFO Starting sync of repository: sbalnojan/meltano-lightdash
---> 100%
{"type": "SCHEMA", "stream": "commits", [...]

INFO METRIC: {"type": "timer", "metric": [...]

{"type": "RECORD", "stream": "commits", "record": {"sha": "c771a832720c0f87b3ce53ac12bdcbf742df4e3d", "commit": {"author": {"name": "Horst", "email":
[...]
"sbalnojan/meltano-lightdash"}, "time_extracted": "2022-09-22T07:37:06.289545Z"}

...[many more records]...

{"type": "STATE", "value": {"bookmarks": {"sbalnojan/meltano-lightdash": {"commits": {"since": "2022-09-22T07:37:06.289545Z"}}}}}

Next, we add the dbt plugin to transform this data.

Install and configure the Postgres-specific dbt utility

dbt uses different adapters depending on the database/warehouse/platform you use. Meltano dbt utilities match this pattern; in this case our utility is dbt-postgres. As usual, you can use the meltano add command to add it to your project.

$ meltano add utility dbt-postgres
2022-09-22T11:32:35.601357Z [info ] Environment 'dev' is active
Added utility 'dbt-postgres' to your Meltano project
...
2024-01-01T00:25:40.604941Z [info ] Installing utility 'dbt-postgres'
---> 100%
...
Installed utility 'dbt-postgres'
2024-01-01T00:25:53.152127Z [info ] Installed utility 'dbt-postgres'
2024-01-01T00:25:53.152894Z [info ] Installed 1/1 plugins

Initialize dbt

Next you can run the initialize command to have the transformer utility populate the project scaffold for dbt.

$ meltano invoke dbt-postgres:initialize
2022-09-22T07:36:52.985090Z [info ] Environment 'dev' is active
creating dbt profiles directory path=PosixPath('/[...]/my-meltano-project/transform/profiles/postgres')
dbt initialized dbt_ext_type=postgres dbt_profiles_dir=PosixPath('/[...]/my-meltano-project/transform/profiles/postgres') dbt_project_dir=PosixPath('/[...]/my-meltano-project/transform')

You can verify that this worked by viewing that the `transform` directory is newly populated with dbt configuration files.

Configure dbt

Configure the dbt-postgres utility to use the same configuration as our target-postgres loader using meltano config:

$ meltano config dbt-postgres set host localhost
&ensp;&ensp;Utility 'dbt-postgres' setting 'host' was set in `meltano.yml`: 'localhost'
$ meltano config dbt-postgres set port 5432
&ensp;&ensp;Utility 'dbt-postgres' setting 'port' was set in `meltano.yml`: 5432
$ meltano config dbt-postgres set user meltano
&ensp;&ensp;Utility 'dbt-postgres' setting 'user' was set in `meltano.yml`: 'meltano'
$ meltano config dbt-postgres set password password
&ensp;&ensp;Utility 'dbt-postgres' setting 'password' was set in `.env`: (redacted)
$ meltano config dbt-postgres set dbname postgres
&ensp;&ensp;Utility 'dbt-postgres' setting 'dbname' was set in `meltano.yml`: 'postgres'
$ meltano config dbt-postgres set schema analytics
&ensp;&ensp;Utility 'dbt-postgres' setting 'schema' was set in `meltano.yml`: 'analytics'

The result of your configuration will look like this in your meltano.yml, remember that sensitive configurations are in your .env file:
  utilities:
- name: dbt-postgres
[...]
config:
host: localhost
port: 5432
user: meltano
dbname: postgres
schema: analytics

Add our source data to dbt

The EL pipeline run already added our source data into the schema tap_github as table commits. dbt will need to know where to locate this data. Let's add that to our dbt project:

mkdir transform/models/tap_github

Add a file called transform/models/tap_github/source.yml into this directory with the following content:

config-version: 2
version: 2
sources:
- name: tap_github # the name we want to reference this source by
schema: tap_github # the schema the raw data was loaded into
tables:
- name: commits

Now we're able to reference the table using the keyword "source" as you can see next.

Add a transformed model

Add a file called authors.sql to the folder transform/models/tap_github with the following contents:

{{
config(
materialized='table'
)
}}

with base as (
select *
from {{ source('tap_github', 'commits') }}
)
select distinct (commit -> 'author' -> 'name') as authors
from base

This model is configured to creating a table via the materialized='table' configuration. The keyword source is used in dbt to reference the source we just created. The actual model selects the distinct author names from the commits which are wrapped into a JSON blob.

Run the transformation process

To create the actual table, we run the dbt model via meltano invoke dbt-postgres:run. Note this relies on previously running meltano run --full-refresh tap-github target-postgres to postgres your database commits table:

$ meltano invoke dbt-postgres:run
2022-09-22T12:30:31.842691Z [info ] Environment 'dev' is active
Extension executing `dbt clean`...
[...]
20:45:09 Finished cleaning all paths.


Extension executing `dbt deps`...
20:45:12 Running with dbt=1.3.4
20:45:12 Warning: No packages were found in packages.yml


Extension executing `dbt run`...
20:45:15 Running with dbt=1.3.4
20:45:15 Found 1 model, 0 tests, 0 snapshots, 0 analyses, 289 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics
20:45:15
20:45:15 Concurrency: 2 threads (target='dev')
20:45:15
20:45:15 1 of 1 START sql table model analytics.auhtors ................................. [RUN]
20:45:15 1 of 1 OK created sql table model analytics.auhtors ............................ [SELECT 1 in 0.14s]
20:45:15
20:45:15 Finished running 1 table model in 0 hours 0 minutes and 0.31 seconds (0.31s).
20:45:15
20:45:15 Completed successfully
20:45:15
20:45:15 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
#

You can check the data inside the database using your favourite SQL editor. There should now be a table inside the analytics schema called authors populated with data.

Run the complete pipeline

To check that everything works together as a pipeline, we clean out once more and run the whole pipeline. Drop the tap_github.commits and the analytics.authors tables by running

docker exec meltano_postgres psql -d postgres -U meltano -c 'DROP TABLE tap_github.commits; DROP TABLE analytics.authors;'

Run the final pipeline alltogether using the parameter --full-refresh to ignore the stored state:

$ meltano run --full-refresh tap-github target-postgres dbt-postgres:run
[warning ] Performing full refresh, ignoring state left behind by any previous runs.

[info ] INFO Starting sync of repository: sbalnojan/meltano-lightdash
<font color="red">[...]</font>
[info ] INFO METRIC: {"type": "timer", "metric": "http_request_duration",[...]

[info ] INFO METRIC: {"type": "counter", "metric": "record_count", "value": 21 [...]

[info ] time=2022-09-22 12:42:57 name=target_postgres level=INFO message=Table '"commits"' [...]

[...]
---> 100%
[info ] Incremental state has been updated at 2022-09-22 12:42:58.260520.
[info ] Block run completed. block_type=ExtractLoadBlocks err=None set_number=0 success=True
[info ] 12:43:19 Running with dbt=1.1.2 cmd_type=command name=dbt-postgres stdio=stderr

[info ] 12:43:20 Found 1 model, [...]

[info ] 12:43:20 cmd_type=command name=dbt-postgres stdio=stderr

[info ] 12:43:20 Concurrency: 2 threads (target='dev') cmd_type=command name=dbt-postgres stdio=stderr

[info ] 12:43:20 cmd_type=command name=dbt-postgres stdio=stderr

[info ] 12:43:20 1 of 1 START table model analytics.authors ..................................... [RUN] [...]
---> 100%
[info ] 12:43:21 1 of 1 OK created table model analytics.authors .........[...]

[info ] 12:43:21 cmd_type=command name=dbt-postgres stdio=stderr

[info ] 12:43:21 Finished running 1 table model in 1.34s. cmd_type=command name=dbt-postgres stdio=stderr

[info ] 12:43:21 cmd_type=command name=dbt-postgres stdio=stderr

[info ] 12:43:21 Completed successfully cmd_type=command name=dbt-postgres stdio=stderr

info ] 12:43:21 cmd_type=command name=dbt-postgres stdio=stderr

[info ] 12:43:21 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1 cmd_type=command name=dbt-postgres stdio=stderr

[info ] Block run completed. block_type=InvokerCommand err=None set_number=1 success=True

There we have it, a complete data pipeline.