Getting Started Guide
Welcome to our Beta Program! This document contains the basic information you will need to start sending lineage data to Datakin. We are developing new features and learning new things every day, so this documentation is almost certainly incomplete. When you run into issues, please let us know so we can improve our onboarding experience.
Sections:
Apache Airflow
Airflow, a common tool used for workflow management, is the foundation of many data pipelines. Enabling OpenLineage in Apache Airflow automatically tracks metadata about jobs and datasets as DAGs execute.
Installation
Adding Datakin metadata collection to existing AirFlow DAGs was designed to be a straightforward process. This integration was implemented as an extension of the airflow Python module, and was developed as part of the Marquez open source lineage repository. The new module, marquez-airflow, can be used in the same way as the original.
$ pip3 install marquez-airflow
Note: You can also add marquez-airflow to your requirements.txt for Airflow.
Configuration
marquez-airflow needs to know where to talk to the Marquez server API. You can set these using environment variables to be read by your Airflow service.
You should set MARQUEZ_NAMESPACE to match the name of your Airflow instance. If you have more than one Airflow instance, the namespace is used to differentiate them.
MARQUEZ_BACKEND=HTTP MARQUEZ_URL=https://api.mydomain.datakin.com MARQUEZ_API_KEY=[YOUR_API_KEY] MARQUEZ_NAMESPACE=my_special_ns
Usage
To begin collecting Airflow DAG metadata with Marquez, update your DAGs with this single line change:
- from airflow import DAG + from marquez_airflow import DAG
When enabled, the library will:
- On DAG start, collect metadata for each task using an Extractor (the library defines a default extractor to use otherwise)
- Collect task input / output metadata (source, schema, etc.)
- Collect task run-level metadata (execution time, state, parameters, etc.)
- On DAG complete, also mark the task as complete in Marquez
To enable logging, set the environment variable MARQUEZ_LOG_LEVEL to DEBUG, INFO, or ERROR:
$ export MARQUEZ_LOG_LEVEL=INFO
Example
An example of this integration using the open source Marquez lineage repository can be found in its GitHub repository.
Apache Spark
Spark is a popular, distributed data processing framework used for large dataset analytics and machine learning. Enabling OpenLineage in Spark jobs enables tracking lineage of jobs and datasets across Spark jobs, as well as giving visibility into the dependencies within a Spark application.
Spark jobs typically run on clusters of machines. A single machine hosts the “driver” application, which constructs a graph of jobs – e.g., reading data from a source, filtering, transforming, and joining records, and writing results to some sink- and manages execution of those jobs. Spark’s fundamental abstraction is the Resilient Distributed Dataset (RDD), which encapsulates distributed reads and modifications of records. While RDDs can be used directly, it is far more common to work with Spark Datasets or Dataframes, which is an API that adds explicit schemas for better performance and the ability to interact with datasets using SQL. The Dataframe’s declarative API enables Spark to optimize jobs by analyzing and manipulating an abstract query plan prior to execution.
Collecting Lineage in Spark
Collecting lineage requires hooking into Spark’s ListenerBus in the driver application and collecting and analyzing execution events as they happen. Both raw RDD and Dataframe jobs post events to the listener bus during execution. These events expose the structure of the job, including the optimized query plan, allowing the Spark integration to analyze the job for datasets consumed and produced, including attributes about the storage, such as location in GCS or S3, table names in a relational database or warehouse, such as Redshift or Bigquery, and schemas. In addition to dataset and job lineage, Spark SQL jobs also report logical plans, which can be compared across job runs to track important changes in query plans, which may affect the correctness or speed of a job.
A single Spark application may execute multiple jobs. The Spark OpenLineage integration maps one Spark job to a single OpenLineage Job. The application will be assigned a Run id at startup and each job that executes will report the application’s Run id as its parent job run. Thus, an application that reads one or more source datasets, writes an intermediate dataset, then transforms that intermediate dataset and writes a final output dataset will report three jobs- the parent application job, the initial job that reads the sources and creates the intermediate dataset, and the final job that consumes the intermediate dataset and produces the final output. As an image:
How to Use the Integration
Adding OpenLineage metadata collection to existing Spark jobs was designed to be straightforward and unobtrusive to the application. The Spark integration works as either a javaagent or as a SparkListener.
SparkListener
The SparkListener approach is very simple and covers most cases. The listener simply analyzes events, as they are posted by the SparkContext, and extracts job and dataset metadata that are exposed by the RDD and Dataframe dependency graphs. Most data sources, such as filesystem sources (including S3 and GCS), JDBC backends, and warehouses such as Redshift and Bigquery can be analyzed and reported in this way.
spark-submit
The listener can be enabled by adding the following configuration to a spark-submit command:
spark-submit --conf "spark.extraListeners=marquez.spark.agent.SparkListener" \\ --packages "io.github.marquezproject:marquez-spark:0.15.2" \\ --conf "openlineage.host=http://" \\ --conf "openlineage.namespace=my_job_namespace" \\ --class com.mycompany.MySparkApp my_application.jar
Additional configuration can be set if applicable:
Configuration Key | Description | Default |
openlineage.parentJobName | The job name of the parent job that triggered this Spark application | |
openlineage.parentRunId | The RunId of the parent job Run that triggered this Spark application | |
openlineage.apiKey | The API Key used to authenticate with the OpenLineage server that collects events | |
openlineage.version | The API version of the OpenLineage specification | 1 |
spark-defaults.conf
Alternatively, the same configuration parameters can be added to the spark-defaults.conf file on cluster creation. Add the following key/value parameters to the spark-defaults.conf file:
spark.jars.packages io.github.marquezproject:marquez-spark:0.15.2 spark.extraListeners marquez.spark.agent.SparkListener openlineage.host http:// openlineage.namespace my_job_namespace
The optional keys defined above can also be added here. When the job is submitted, additional or overriding configuration values can be supplied. E.g., the openlineage.host and openlineage.namespace can be defined in the spark-defaults.conf file and the openlineage.parentRunId and openlineage.parentJobName configuration can be supplied when the job is submitted by the parent job.
Javaagent
The Javaagent approach is the earliest approach to adding lineage events. It was aimed to support instrumenting Spark code directly by manipulating bytecode at runtime. In the case of Dataframe or RDD code that doesn’t expose the underlying datasource directly, the javaagent approach will allow injecting bytecode at runtime to expose the required information. This approach requires being able to add the marquez-spark jar on the driver host and adding the correct JVM startup parameters. This may not be possible, e.g., on a serverless Spark platform, such as AWS Glue.
spark-submit
The following configuration must be added to the spark-submit command when the job is submitted:
spark-submit --conf spark.driver.extraJavaOptions=-javaagent:=http:///api/v1/namespaces//?api_key=
If a parent job run is triggering the Spark job run, the parent job’s name and run ID can be included as such:
spark-submit --conf spark.driver.extraJavaOptions=-javaagent:=http:///api/v1/namespaces//jobs//runs/?api_key=
From Airflow
The same parameters passed to spark-submit can be supplied from Airflow and other schedulers. If using the marquez-airflow integration, each task in the DAG has its own Run id which can be connected to the Spark job run via the openlineage.parentRunId parameter. For example, here is an example of a DataProcPySparkOperator that submits a Pyspark application on Dataproc:
t1 = DataProcPySparkOperator( task_id=job_name, gcp_conn_id='google_cloud_default', project_id='project_id', cluster_name='cluster-name', region='us-west1', main='gs://bucket/your-prog.py', job_name=job_name, dataproc_pyspark_properties={ "spark.extraListeners": "marquez.spark.agent.SparkListener", "spark.jars.packages": "io.github.marquezproject:marquez-spark:0.15.+", "openlineage.url": f"{marquez_url}/api/v1/namespaces/{marquez_namespace}/jobs/dump_orders_to_gcs/runs/{{{{task_run_id(run_id, task)}}}}?api_key={api_key}" }, dag=dag)
The same job can be submitted using the javaagent approach:
t1 = DataProcPySparkOperator( task_id=job_name, gcp_conn_id='google_cloud_default', project_id='project_id', cluster_name='cluster-name', region='us-west1', main='gs://bucket/your-prog.py', job_name=job_name, dataproc_pyspark_properties={ 'spark.driver.extraJavaOptions': f"-javaagent:marquez-spark-0.15.0.jar={marquez_url}/api/v1/namespaces/{marquez_namespace}/jobs/dump_orders_to_gcs/runs/{{{{task_run_id(run_id, task)}}}}?api_key={api_key}" files="https://repo1.maven.org/maven2/io/github/marquezproject/marquez-spark/0.15.0/marquez-spark-0.15.0.jar", dag=dag)
Additional Resources
For documentation on the spark data collection integration, visit the Marquez GitHub repository:
https://github.com/MarquezProject/marquez/tree/main/integrations/spark