Dagster as your Data Orchestrator

2023-09-07 | Article | Insights

What is Dagster

Dagster is an open-source python-based data orchestrator that makes building and managing data pipelines easier. It also provides powerful monitoring, testing, and error handling capabilities. Instead of having a task centric orchestrator, Dagster supports a declarative and asset based approach. It enables thinking of the nature itself of the asset created by data pipelines (CSV file, BigQuery table, Machine Learning model, etc.) which makes it adapted to today’s modern data stack.

Having assets as primary abstraction, Dagster enables a modern way of managing data that makes it easier to trust, to organize and to change.

What makes Dagster powerful

At Digitl Cloud, we use Dagster to orchestrate most of our data pipelines. Besides its data-centric approach, here are the primary reasons why we consider it a powerful tool:

Modularity and reusability: Dagster enables building custom assets, jobs, and scheduling strategies. The code that describes the logic behind any asset can be easily shared and reused across different pipelines, facilitating faster development and reducing duplication of effort.

Testing and validation: With Dagster, Data pipelines can be run before merging them into production, catching problems before they cause issues in production. Testing pipelines locally allows faster iteration and most importantly reduces testing running time cost. Additionally, it is possible to run pipelines in a unit test, enabling more effective testing.

Code ergonomics:Having an asset based logic helps lower the complexity of the code. Instead of specifying every dependency, Dagster infers dependencies within the asset definition. This will be illustrated in the example section below.

DBT integration: in the same workflow, Dagster supports dbt alongside other technologies like python and spark. Dagster’s concept makes it easy to define assets that are relying on a specific dbt model. Here are a couple of practical examples to illustrate this:

  1. After data ingestion into the data warehouse, dbt models can be seamlessly executed.
  2. There is the flexibility to selectively materialize both the dbt models themselves and their associated dependencies.
  3. Visualizing and orchestrating a graph of dbt assets is very straightforward. They can also be triggered using a single dbt command.

Observability and monitoring: Dagster provides built-in tools for monitoring and observing pipeline runs. It captures rich metadata during execution, allowing to track lineage, inspect inputs and outputs, and visualize pipeline behavior. This helps with debugging, troubleshooting, and ensuring data quality.

Scalability and parallelism: Dagster supports parallel and distributed execution of pipelines, making it scalable for large data volumes and compute-intensive workloads. It can leverage compute clusters, containerization, and cloud platforms to efficiently execute pipeline tasks in parallel. In our case, Dagster pipeline is packaged as a Docker image and runs as a Kubernetes Job. The approach involves defining a Dagster job that encapsulates the pipeline logic and dependencies. Then, a Kubernetes Job can be created that references the Dagster image and launches the job on a Kubernetes cluster. This enables leveraging Kubernetes features like scaling, fault tolerance, and resource allocation.

Example of pipeline implementation using Dagster

To understand how to implement a data pipeline with just a few lines of code, let's clarify all concepts using a hands-on example.

In this example, the first step involves loading news articles related to Artificial Intelligence using the News API, followed by the creation of specific tables related to news articles published by Google News and BBC News.

Implementing the pipeline

The first step of the pipeline is to get all news articles related to Artificial Intelligence and sort them by popularity. These news articles will be used as input to create both BBC News and Google News tables.

    
import os
import pandas as pd
import requests
from dagster import asset

@asset
def ai_news_articles() -> dict:
    response = requests.get(
        "https://newsapi.org/v2/everything",
        params={
            "q": "Artificial Intelligence",
            "sortBy": "popularity",
            "language": "en",
            "apiKey": os.environ.get("API_KEY"),
        },
    )
    response.raise_for_status()
    return response.json()

# ai_news_articles is added as input for the next assets, defining a DAG automatically.
@asset
def google_news_ai_articles(ai_news_articles) -> pd.DataFrame:
    df = pd.json_normalize(ai_news_articles["articles"])
    google_news_articles = df[df["source.name"] == "Google News"]
    return google_news_articles

@asset
def bbc_news_ai_articles(ai_news_articles) -> pd.DataFrame:
    df = pd.json_normalize(ai_news_articles["articles"])
    bbc_news_articles = df[df["source.name"] == "BBC News"]
    return bbc_news_articles
    
  

This code demonstrates how to create a software-defined asset using the @asset decorator.

The ai_news_articles asset represents a dictionary of news article data from all possible sources.

In the next stage of the pipeline, ai_news_articles will be used as input for the following assets to filter and keep only articles published by "Google News" and "BBC News". Dependencies between assets are defined by adding the asset name as a parameter to the dependent asset's function. In this case, ai_news_articles (all published articles) is a dependency of google_news_ai_articles and bbc_news_ai_articles. As you may have noticed, the output of ai_news_articles is a Python dictionary, while google_news_ai_articles and bbc_news_ai_articles are both Pandas dataframes.

Exploring the Dagster UI (Dagit)

With Dagster's UI, it is possible to visualize data assets, manually launch runs and materialization, and observe what happens during pipeline runs.

To create a software-defined asset, the process involves its creation or update. Dagster achieves this by executing the asset's function or initiating an integration. These assets can be stored in various locations, such as databases or cloud storage. Dagster simplifies the process of interacting with different destinations by offering a built-in capability known as I/O managers. They can be seen as the objects responsible for saving and loading assets, as well as output from operations. In cases where an asset lacks a configured I/O manager, it is saved within the local file system.

In this example, clicking the “Materialize all” button will trigger the pipeline run. This can be monitored using the UI.

As can be seen in the screenshot above, timeline progress of the triggered pipeline can be monitored using printed logs in real time. As the default Dagster I/O manager is local file system based, all the materialized assets have been saved as local objects.

In conclusion, this example provides a high level overview into the capabilities of Dagster. However, Dagster offers much more, including the ability to leverage IO/managers for tailored integration with a wide range of data warehouse stacks. Additionally, pipeline execution can be enhanced with additional metadata, such as markdown, for improved documentation and collaboration. The flexibility and extensibility of Dagster present endless possibilities for optimizing data pipelines and unlocking valuable insights from data.

Sources

dagster.io, dbt + Dagster#
medium.com, Dagster vs. Airflow | Dagster Blog
dagster.io, Introducing Software-Defined Assets

Do you need more Info?

Contact