BI Stack Example Project
If you opted in to adding the example project during sidetrek init
, you should have a fully functional data pipeline already set up.
Let’s take a look at each part of this example project to see what’s going on.
The Tools
This stack consists of the following tools:
- Dagster for orchestration
- Meltano for data ingestion
- DBT for data transformation
- Minio (local replacement for S3) and Apache Iceberg for data storage
- Trino for data querying
- Superset for data visualization
In the example project, all these tools are pre-configured and connected for you.
Data Pipeline Architecture
The data pipeline consists of five components:
- Data Ingestion: First, we extract the data from our source systems (in our example, that’s the example CSV files) and load them into our storage (Minio + Iceberg) using Meltano.
- Data Storage: This is where all the data is stored. We’re using Minio as a local replacement for S3 and Iceberg as the table format on top of Minio.
- Data Transformation: Once the raw data is in our storage, we transform them into analytics-ready tables using DBT and Trino.
- Orchestration: All the above steps are orchestrated by our orchestrator, Dagster.
- Data Visualization: Finally we visualize our data in Superset.
To sum up:
Data sources (e.g. CSV files) -> Ingestion (Meltano) -> Storage (Minio + Iceberg) -> Transformation (DBT + Trino) -> Visualization (Superset)
…all orchestrated by Dagster.
The Dataset
We’ve included a simple dataset with four tables: orders
(100k rows), customers
(20k rows), products
(2k rows), and stores
(200 rows). This emulates the typical e-commerce data.
Each table has its own csv file in the /your_project/data
directory.
Data Ingestion
Data ingestion is handled by Meltano. Meltano is a managed connector with hundreds of pre-built connectors to popular data sources and targets. Essentially, it handles the EL (“Extract, Load”) part of the ELT process.
As a quick reminder, here’s where Meltano is in the project structure:
Meltano Extractor and Loader
To use Meltano to ingest data, you need to set up two things: 1) an extractor (“tap”) and 2) a loader (“target”).
Taps and Targets?
In our example project, our data is stored in CSV files and we want to load that data into Iceberg tables, so we’re using the tap-csv extractor and target-iceberg loader.
Note that currently, there is no official Meltano target for Iceberg, so we’ve created a custom target. This is also open-source, so feel free to check out the source code here.
Currently, `target-iceberg` only supports APPEND operation!
Adding the Extractor
We’ve already added and configured the extractor for you in the example project, but we’ll go through the setup so you can acquaint yourself with the setup.
To install an extractor, you can run a command like this:
Replace tap-csv
with the name of the extractor you want to add. See the Meltano documentation for more information on how to work with Meltano.
Once the extractor is installed, it needs to be configured. We’ve also done this for you in the example project. You can see it in meltano.yml file in the /meltano directory.
csv_files_definition
field lets Meltano know where to find the CSV files. If you look at that file, you’ll see something like this:
It basically tells Meltano where to find the CSV files and what the primary keys are for each table.
Adding the Loader
Once the extractor is set up, it’s time to add the loader. This is where you tell Meltano where to load the data to.
In the example project, we’re loading the data into Iceberg tables. We’ve already added and configured the loader for you in the example project, but if you want to add your own later, you can do it like this:
Note that you typically don’t need the --custom
flag if you’re using one of the existing Meltano loaders.
Configuring the loader is similar to configuring the extractor. You can find the configuration in the meltano.yml file in the /meltano directory.
It requires AWS credentials, an S3 endpoint (which is a Minio endpoint in our case since we’re using Minio as an S3 replacement for our development environment), and Iceberg-related configurations.
Iceberg is a table format that sits on top of a physical storage layer like S3. When you load data into Iceberg tables, the actual data is stored as files in S3, and Iceberg specifies how those files are organized into tables we’re familiar with.
In essence, Iceberg turns object storage like S3 into a data warehouse.
This is why this loader requires AWS credentials and a Minio endpoint, not just Iceberg configurations.
To learn more about how Iceberg works, check out the Iceberg documentation.
Running Meltano Ingestion
Now that we have the Meltano extractor and loader installed and configured, we can run the ingestion.
You can run it manually by executing the following command:
sidetrek run
here is a simple convenience wrapper around meltano run *
command. It runs the meltano CLI inside the project virtual env and also sets the cwd to /meltano directory.
It’s identical to running:
Running Meltano Ingestion Inside the Dagster Pipeline
We saw how we can trigger the ingestion manually with the CLI, but often we want to run the ingestion inside the orchestrator (Dagster). For example, we might want to schedule the ingestion to run daily.
In the example project, we’ve already added a job to run the Meltano ingestion in the Dagster pipeline.
If you follow the code to run_csv_to_iceberg_meltano_job
function above, you’ll see that we’ve added a Dagster job to run meltano.
As you can see here, this is simply running the Meltano CLI inside a Dagster job.
In the example project, this is all we’ve done, but for your own data pipeline, you can easily add scheduling on top of this job to run it daily or hourly.
Finally, let’s run the Meltano ingestion job inside Dagster.
- Go to the Dagster dashboard at http://localhost:3000 and click on the
run_csv_to_iceberg_meltano_job
job
- Go to the tab “Launchpad” and click “Launch Run”.
You’ll see the job starts running. It might take a couple of minutes to finish the ~100k rows of data we’re using in our example project.
If the job is run successfully, you’ll see the 4 different tables written inside Minio’s raw
prefix (which maps to raw
Trino schema).
Check it out in Minio by going to http://localhost:9000 and logging in with the username admin
and password admin_secret
.
Want to change the username/password for Minio?
Inspecting the Iceberg Tables with Trino
Once the data is inserted to the Iceberg tables, you can inspect them from inside the Trino shell.
Once you’re in the Trino shell, first you need to switch to the iceberg
catalog and raw
schema. This basically tells Trino we’re using the raw
schema inside the Iceberg data store.
Trino catalog?
Then list out the tables in that schema.
To view the data inside the table, you can run something like:
This will show you the number of rows in the orders
table.
Data Transformation
Now that we have the data in the Iceberg tables, we can start the transformation process to turn the raw data into a more useful form for analytics.
Above, we briefly showed you how we used our query engine Trino to query the data in the Iceberg tables.
Using the Trino shell directly is great for data exploration and ad hoc queries, but it’s not suitable for an automated pipeline. For that, we connect DBT to Trino via the dbt-trino
adapter so we can run our transformations inside Dagster (our orchestrator).
Now, let’s see how we can set up dbt and Trino to work together.
Here’s a quick reminder of where dbt and Trino are in the project structure.
DBT + Trino
DBT is a SQL data transformation tool that follows software engineering best practices.
It makes it easy to write complex SQL queries, but it cannot execute those SQL queries itself. This is where Trino comes in. Trino is a query engine that can execute our DBT SQL queries.
DBT provides a Trino adapter (dbt-trino
) that allows us to run our DBT transformations on Trino easily.
This Trino connection is configured in dbt/your_project/profiles.yml
:
Notice the `schema` of `project`
In dbt/your_project/dbt_project.yml
, you’ll see the project configuration.
There’s one more step required. We need to create the raw
schema in Trino before we can run the DBT transformations.
We do this by adding the on-run-start
hook inside dbt/your_project/dbt_project.yml
.
If you recall, it’s the Trino schema where we loaded the data from Meltano. Recall this command we ran inside Trino shell:
We previously set up the loader to load the data into the raw
schema in the iceberg
catalog.
Trino + Iceberg
Trino is a query engine (or “compute engine” to be more general).
But query engines need to know where the data is stored. If you want to dig up gold (our data), you need both the shovel (the compute engine, which is Trino) and the gold deposit (the storage, which is Iceberg + Minio).
In our case, the data is stored in Iceberg tables in Minio so we need to connect Trino with Iceberg somehow.
We’ve already set this up for you - if you look at the trino
directory in the example project, you’ll see the iceberg.properties file inside /trino/etc/catalog directory. This is the file that connects Trino with Iceberg.
To learn more about this, see the Trino Iceberg connector documentation.
Staging, Intermediate, and Marts
OK, now we’ve set up all the tools required to write our transformation queries in DBT.
We can start writing SQL queries in DBT now, but instead, let’s take a look at a helpful organizational pattern: Staging, Intermediate, and Marts.
This is a common pattern in data warehousing where you have three layers of transformations, progressively turning raw data into a more business-conformed form suitable for analytics.
This pattern not only makes our SQL code more modular and maintainable but also makes it easier to collaborate with others. With a common design pattern like this, everyone knows exactly what to expect.
- Staging: This is where we create atomic tables from the raw data. You can think of each of these tables as the most basic unit of data — an atomic building block we’ll later compose together to build more complex SQL queries. This makes our SQL code more modular.
- Intermediate: This is where we compose a bunch of atomic tables from the staging stage to create more complex tables. You can think of this stage as an intermediate step between the modular tables in the staging stage and the final, business-conformed tables in the marts stage.
- Marts: This is where you have the final, business-conformed data ready for analytics. Each mart is typically designed to be consumed by a specific function in the business, such as the finance team, marketing team, etc.
For more details, we highly recommend you check out DBT’s excellent Best Practices Guide.
OK, now let’s take a look at how we set up each stage in our example project.
Staging
In dbt/your_project/models
directory, you’ll see the staging directory.
Inside, you’ll find stg_iceberg.yml
where we specify the data sources for the staging tables as well as define the staging tables (or “models” in DBT terms) themselves.
Notice how we’ve specified in DBT that the data sources
are in the iceberg
database and the raw
schema. This is because we’ve set up Trino to connect to Iceberg tables in the raw
schema.
In other words, database
here maps to Trino catalog
.
Staging tables are then defined under models
.
DBT file naming convention matters!
Example DBT Model in Staging
We won’t go through each table, but here’s an example of a DBT model stg_iceberg__orders
.
I’m sure you recognize the SQL queries here. The part you might not be familiar with is the config
block at the top.
This is actually a Jinja template that DBT uses to augment SQL code. In this case, we’re telling DBT how to materialize this model.
When we run DBT build, it will take this code and create a valid SQL query that can be executed by Trino.
Adding Staging Model Configuration to dbt_project.yml
Finally, we need to add a bit of extra configuration. We do this in the dbt_project.yml
file in the dbt
directory.
Deduping
There’s one more thing we should go over: deduping.
Because target-iceberg
currently only supports APPEND operation, you will probably want to dedupe the data during the staging
stage of the DBT transformation.
This way you can make sure the ingestion process is idempotent (i.e. you can run it multiple times with the same result).
There are many ways to dedupe your data here, but let’s see how we did it in the example project.
Before we get to the code, let’s enter the Trino shell and see how many rows are in the raw orders
table (i.e. ingested frmo Meltano before any transformation).
You’ll see that we have 100k rows in the raw orders
table as expected (since our source orders.csv
has 100k rows).
Once we run the staging
DBT models, we can see that stg_iceberg__orders
table also has 100k rows as expected.
The output should be:
Now, if we run the Meltano ingestion job AGAIN in Dagster, we’ll see that the orders
table in Trino will have 200k rows. This is because target-iceberg
APPENDs the data.
You should see:
Clearly we don’t want this to happen in our staging model since we don’t want any duplicate data.
To dedupe the data, we add the row_num
column to the source raw orders
table by using the row_number()
function in the stg_iceberg__orders
model.
This code adds a row number to duplicate rows based on id
field. For example, if we have two rows with the same id
, the row with the higher id
will have row_num
of 1 and the other will have row_num
of 2.
Then in deduped_and_renamed
CTE, we only select rows where row_num
is 1. This effectively dedupes the data.
Now if you go back to the Trino shell and run the query to see the number of rows in the stg_iceberg__orders
table, you’ll see that it’s still only 100k.
The output should be:
We can do this for all tables in the staging area to make sure our ingestion process is idempotent at the staging
stage.
Intermediate
Similar to the staging stage, we have the intermediate stage in the dbt/your_project/models/intermediate
directory.
As with staging, you’ll notice we specify the sources and models in the int_iceberg.yml
file.
Then we add the model SQL file int_iceberg__denormalized_orders.sql
.
Finally in dbt/dbt_project.yml, we add the configuration for the intermediate stage.
Marts
Same deal with marts. We have the dbt/your_project/models/marts
directory where we specify the sources and models in the mart_iceberg.yml
file.
Take note of the Marts naming convention.
Then we add the model SQL files. We’ll skip an example here, but you get the idea.
Finally, we add the configuration for the marts stage in dbt/dbt_project.yml
.
Running DBT Transformations in Dagster
Now that we have the DBT transformations set up, let’s see how we can run them in Dagster.
Dagster has a deep DBT integration. It can infer the dependencies between all your DBT models and run them in the correct order. The visualization of this graph is also very helpful.
We add all DBT assets into Dagster in dagster/your_project/your_project/dbt_assets.py
.
In the code above, we’re using the dagster_dbt
package, which the Dagster team created to import DBT models as Dagster “assets”.
What are Dagster "assets"?
Once we’ve done that, we can add the DBT assets to the Dagster definitions in dagster/your_project/your_project/__init__.py
.
Now you should be able to see all DBT assets in Global Asset Lineage section of the Dagster dashboard:
- Click on the “Assets” in the top menu.
- Click on “View global asset lineage” at the top right.
In the global asset lineage view, click “Materialize all” to execute the DBT transformations (underneath, Trino is executing these DBT queries). Of course, in a production environment, you might want to schedule them instead of manually triggering them.
If the materialization was successful, you should see 3 analytics-ready tables in project_marts
prefix in minio.
Data Visualization
OK, we have the data in the form we want for analytics!
Now we can visualize them using a tool like Superset.
We know that Data Analysts are particular about which tool they use here. So we’ve separated out Superset in its own directory so it can be easily replaced by another tool of your choice.
Let’s configure Superset to connect to Trino and visualize the data.
Superset + Trino
Go to the Superset dashboard at http://localhost:8088 and log in with the username admin
and password admin
.
You can change the username and password in the docker-init.sh
file inside the superset/docker
directory:
Adding Trino as a Database Connection in Superset
Now we need to add Trino as a database connection in Superset.
- Go to the “Settings” dropdown at the top right corner and click “Database Connections”.
- Click on the “+Database” button at the top right corner.
- Find “Trino” option in the “SUPPORTED DATABASES” select field near the bottom.
- In the “SQLALCHEMY URI” field, enter
trino://trino@host.docker.internal:8080/iceberg
and then click “Connect”.
Trino should now be connected to Superset.
Adding an Example Dashboard
Now that we’ve connected Superset to Trino, let’s add an example dashboard.
- First download the example dashboard we created for you here.
- Go to the “Dashboards” tab and click on the Import Dashboard icon at the top-right corner.
- Upload the downloaded zip file and click “Import”.
- Find the example dashboard you just added in the list of dashboards and click on it to view it.
That’s it! You should now see a bunch of charts we created for you based on the example dataset.
Next Steps
Congratulations! You’ve just built an end-to-end data pipeline from scratch.
This is a simple example, but it’s a great starting point for building more complex data pipelines.
If you have any questions or need help, feel free to reach out to us in our Slack community.