Part 3 - Process data
Let’s learn by example.
In parts 1 & 2, we extracted data from GitHub and loaded it into a (local) PostgreSQL database. Now it is time to have more fun. We decide to load all attributes from the data we selected previously, and then build a model listing the different authors of commits to our repository.
That means, in this part we're going to unleash dbt (data build tool) onto our data to transform it into meaningful information. Don't worry, you don't need to know anything about dbt, this tutorial is self-contained. You do not need to install dbt yourself, it works as a Meltano plugin.
Of course you can choose any other processing tool like:
- a jupyter notebook
- or plain Python
to process your data.
If you're having trouble throughout this tutorial, you can always head over to the Slack channel to get help.
Select more source data
To get all the data from the GitHub commits, you can use the meltano select
command:
meltano select tap-github commits "*"
This will add the following line to your project file:
extractors:
- name: tap-github
[...]
select:
- commits.url # <== technically not necessary anymore, but no need to delete
- commits.sha # <== technically not necessary anymore, but no need to delete
- commits.commit_timestamp # <== technically not necessary anymore, but no need to delete
- commits.* # <== new data.
To refresh your database tables after the configuration changes you can run meltano run --full-refresh tap-github target-postgres
:
$ meltano run --full-refresh tap-github target-postgres
2024-09-22T07:36:52.985090Z [info ] Environment 'dev' is active
{"type": "STATE", "value": [...]}
INFO Starting sync of repository: sbalnojan/meltano-lightdash
---> 100%
{"type": "SCHEMA", "stream": "commits", [...]
INFO METRIC: {"type": "timer", "metric": [...]
{"type": "RECORD", "stream": "commits", "record": {"sha": "c771a832720c0f87b3ce53ac12bdcbf742df4e3d", "commit": {"author": {"name": "Horst", "email":
[...]
"sbalnojan/meltano-lightdash"}, "time_extracted": "2024-09-22T07:37:06.289545Z"}
...[many more records]...
{"type": "STATE", "value": {"bookmarks": {"sbalnojan/meltano-lightdash": {"commits": {"since": "2024-09-22T07:37:06.289545Z"}}}}}
Next, we add the dbt plugin to transform this data.
Install and configure the Postgres-specific dbt utility
dbt uses different adapters depending on the database/warehouse/platform you use. Meltano dbt utilities match this pattern; in this case our utility is dbt-postgres
. As usual, you can use the meltano add
command to add it to your project.
$ meltano add utility dbt-postgres
2024-09-22T11:32:35.601357Z [info ] Environment 'dev' is active
Added utility 'dbt-postgres' to your Meltano project
...
2024-01-01T00:25:40.604941Z [info ] Installing utility 'dbt-postgres'
---> 100%
...
Installed utility 'dbt-postgres'
2024-01-01T00:25:53.152127Z [info ] Installed utility 'dbt-postgres'
2024-01-01T00:25:53.152894Z [info ] Installed 1/1 plugins
Initialize dbt
Next you can run the initialize
command to have the transformer utility populate the project scaffold for dbt.
$ meltano invoke dbt-postgres:initialize
2024-09-22T07:36:52.985090Z [info ] Environment 'dev' is active
creating dbt profiles directory path=PosixPath('/[...]/my-meltano-project/transform/profiles/postgres')
dbt initialized dbt_ext_type=postgres dbt_profiles_dir=PosixPath('/[...]/my-meltano-project/transform/profiles/postgres') dbt_project_dir=PosixPath('/[...]/my-meltano-project/transform')
Installing dbt-postgres may require building psycopg2
from source. An easy workaround is to constrain dbt-postgres
in the plugin's pip_url
in your meltano.yml
file:
utilities:
- name: dbt-postgres
pip_url: dbt-core dbt-postgres<1.8 meltano-dbt-ext~=0.3.0
You can verify that this worked by viewing that the transform
directory is newly populated with dbt configuration files.
Configure dbt
Configure the dbt-postgres utility to use the same configuration as our target-postgres loader using meltano config
:
$ meltano config dbt-postgres set host localhost
  Utility 'dbt-postgres' setting 'host' was set in `meltano.yml`: 'localhost'
$ meltano config dbt-postgres set port 5432
  Utility 'dbt-postgres' setting 'port' was set in `meltano.yml`: 5432
$ meltano config dbt-postgres set user meltano
  Utility 'dbt-postgres' setting 'user' was set in `meltano.yml`: 'meltano'
$ meltano config dbt-postgres set password password
  Utility 'dbt-postgres' setting 'password' was set in `.env`: (redacted)
$ meltano config dbt-postgres set dbname postgres
  Utility 'dbt-postgres' setting 'dbname' was set in `meltano.yml`: 'postgres'
$ meltano config dbt-postgres set schema analytics
  Utility 'dbt-postgres' setting 'schema' was set in `meltano.yml`: 'analytics'
The result of your configuration will look like this in your meltano.yml, remember that sensitive configurations are in your .env file:
utilities:
- name: dbt-postgres
[...]
config:
host: localhost
port: 5432
user: meltano
dbname: postgres
schema: analytics
Add our source data to dbt
The EL pipeline run already added our source data into the schema tap_github
as table commits
. dbt will need to know where to locate this data. Let's add that to our dbt project:
mkdir transform/models/tap_github
Add a file called transform/models/tap_github/source.yml
into this directory with the following content:
config-version: 2
version: 2
sources:
- name: tap_github # the name we want to reference this source by
schema: tap_github # the schema the raw data was loaded into
tables:
- name: commits
Now we're able to reference the table using the keyword "source" as you can see next.
Add a transformed model
Add a file called authors.sql
to the folder transform/models/tap_github
with the following contents:
{{
config(
materialized='table'
)
}}
with base as (
select *
from {{ source('tap_github', 'commits') }}
)
select distinct (commit -> 'author' -> 'name') as authors
from base
This model is configured to creating a table via the materialized='table'
configuration. The keyword source
is used in dbt to reference the source we just created. The actual model selects the distinct author names from the commits which are wrapped into a JSON blob.
Run the transformation process
To create the actual table, we run the dbt model via meltano invoke dbt-postgres:run
. Note this relies on previously running meltano run --full-refresh tap-github target-postgres
to postgres your database commits
table:
$ meltano invoke dbt-postgres:run
2024-09-22T12:30:31.842691Z [info ] Environment 'dev' is active
Extension executing `dbt clean`...
[...]
20:45:09 Finished cleaning all paths.
Extension executing `dbt deps`...
20:45:12 Running with dbt=1.3.4
20:45:12 Warning: No packages were found in packages.yml
Extension executing `dbt run`...
20:45:15 Running with dbt=1.3.4
20:45:15 Found 1 model, 0 tests, 0 snapshots, 0 analyses, 289 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics
20:45:15
20:45:15 Concurrency: 2 threads (target='dev')
20:45:15
20:45:15 1 of 1 START sql table model analytics.auhtors ................................. [RUN]
20:45:15 1 of 1 OK created sql table model analytics.auhtors ............................ [SELECT 1 in 0.14s]
20:45:15
20:45:15 Finished running 1 table model in 0 hours 0 minutes and 0.31 seconds (0.31s).
20:45:15
20:45:15 Completed successfully
20:45:15
20:45:15 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
#
If you encounter an error related to dbt not being to clean files outside of the project directory, you can add the following to your meltano.yml
file:
version: 1
env:
DBT_CLEAN_PROJECT_FILES_ONLY: "false"
You can check the data inside the database using your favourite SQL editor. There should now be a table inside the analytics
schema called authors
populated with data.
Run the complete pipeline
To check that everything works together as a pipeline, we clean out once more and run the whole pipeline. Drop the tap_github.commits and the analytics.authors tables by running
docker exec meltano_postgres psql -d postgres -U meltano -c 'DROP TABLE tap_github.commits; DROP TABLE analytics.authors;'
Run the final pipeline alltogether using the parameter --full-refresh
to ignore the stored state:
$ meltano run --full-refresh tap-github target-postgres dbt-postgres:run
[warning ] Performing full refresh, ignoring state left behind by any previous runs.
[info ] INFO Starting sync of repository: sbalnojan/meltano-lightdash
<font color="red">[...]</font>
[info ] INFO METRIC: {"type": "timer", "metric": "http_request_duration",[...]
[info ] INFO METRIC: {"type": "counter", "metric": "record_count", "value": 21 [...]
[info ] time=2024-09-22 12:42:57 name=target_postgres level=INFO message=Table '"commits"' [...]
[...]
---> 100%
[info ] Incremental state has been updated at 2024-09-22 12:42:58.260520.
[info ] Block run completed. block_type=ExtractLoadBlocks err=None set_number=0 success=True
[info ] 12:43:19 Running with dbt=1.1.2 cmd_type=command name=dbt-postgres stdio=stderr
[info ] 12:43:20 Found 1 model, [...]
[info ] 12:43:20 cmd_type=command name=dbt-postgres stdio=stderr
[info ] 12:43:20 Concurrency: 2 threads (target='dev') cmd_type=command name=dbt-postgres stdio=stderr
[info ] 12:43:20 cmd_type=command name=dbt-postgres stdio=stderr
[info ] 12:43:20 1 of 1 START table model analytics.authors ..................................... [RUN] [...]
---> 100%
[info ] 12:43:21 1 of 1 OK created table model analytics.authors .........[...]
[info ] 12:43:21 cmd_type=command name=dbt-postgres stdio=stderr
[info ] 12:43:21 Finished running 1 table model in 1.34s. cmd_type=command name=dbt-postgres stdio=stderr
[info ] 12:43:21 cmd_type=command name=dbt-postgres stdio=stderr
[info ] 12:43:21 Completed successfully cmd_type=command name=dbt-postgres stdio=stderr
info ] 12:43:21 cmd_type=command name=dbt-postgres stdio=stderr
[info ] 12:43:21 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1 cmd_type=command name=dbt-postgres stdio=stderr
[info ] Block run completed. block_type=InvokerCommand err=None set_number=1 success=True
There we have it, a complete data pipeline.