Dear All,

Over last few weeks I made serious improvements to the lineage support that
Airflow has. Whilst not complete it’s starting to shape up and I think it
is good to share some thoughts and directions. Much has been discussed with
several organisations like Polidea, Daily Motion and Lyft. Some have
already implemented some support for lineage themselves (Daily Motion) and
some have a need for it (Lyft with Amundsen).

First a bit of a recap. What is lineage of why is it important? Lineage
allows you to track the origins of data what happens to it and where it
moves over time. Lineage is often associated with audibility of data
pipelines which is not a very sexy subject ;-). However, there are much
more prominent and user facing improvements possible if you have lineage
data available. Lineage greatly simplifies the ability to trace back errors
to the root cause in analytics. So, instead of the user calling up the
engineering team in case of a data error, it could traceback to the origin
of the data and call the one that has created the original data set.
Lineage also greatly improves discoverability of data. Lineage information
gives insights into the importance of data sets. So if a new employee joins
a team he would normally go to the most senior person in that team to ask
him what data sources he is using and what their meaning is. If lineage
information is exposed through a tool like Amundsen this is not required
because that person can just look it up.

To summarise their are 3 use cases driving the need for lineage:

1. Discoverability of data
2. Improved data operations
3. Audibility of data pipelines

So that’s all great I hear you thinking, but why don’t we have it in
Airflow already if it is so important? The answer to that is two fold.
Firstly, adding lineage information is often associated with a lot of
metadata and meta programming. Typically if lineage is being ’slapped on’
one needs to add a lot of metadata which then need to be kept in sync. In
that way it does not solve a problem for the developer and rather it
creates one. Secondly, Airflow is a task based system and by definition
does not have a very good infrastructure that deals with data. In the past
we had some trials by Jeremiah to add Pipelines, but it never was
integrated and I think it actually sparked him to start Prefect ;-)
(correct me if I am wrong if you are reading this Jermiah).

Where is lineage support now in Airflow? In the 1.10.X series there is some
support for lineage, but it is buggy and difficult to use as it is based on
the metadata model of Apache Atlas. In master the foundation has much
improved (but fully done yet). You can now set inlets and outlets with
lightweight objects like File(url=“http://www.google.com”) and
Table(name=“my_table”) and the lineage system in Airflow will figure out a
lot for you. You can also have inlets pick up outlets from previous
upstream tasks by passing a list of task_ids or even using “AUTO” which
picks up outlets from direct upstream tasks.

The lightweight objects are automatically templated so you can do something
like File(url=“/tmp/my_data_{{ execution_date }}”) which does the right
thing for you. Templating inlets and outlets gives very powerful
capabilities by for example creating a Task, that, based on the inlets it
receives, can drop PII information from an arbitrary table and output this
table somewhere else. This allows for creating Generic Tasks/Dags that can
be re-used without any domain knowledge. A small example (not PII) is
available with the example_papermill_operator.

Lineage information is exposed through an API endpoint. You can query
“/api/experimental/lineage/<dag_id>/<execution_date>” and you will get a
list of tasks with their inlets and outlets defined. The lineage
information shared through the API and the lightweight object model are
very close to the model used within Lyft’s Amundsen so when that gets
proper visualisation support for lineage and pulls in the information from
Airflow it’s presto! Other systems might require some translation but that
shouldn’t be too hard.

What doesn’t it do? Well, and here we get to the point of this discussion,
there is still meta programming involved to keep the normal parameters and
the inlets and outlets to an operator in sync. This is because it’s hard to
make operators lineage aware without changing them.  So while you set
“inlets” and “outlets” to an Operator the operator itself doesn’t do
anything with them, making them a lot less powerful. Actually, there is
only one operator that has out of the box support for lineage is the
PapermillOperator.

In discussions with the aforementioned organisations it became clear that,
while we could change all operators that Airflow comes out of the box with,
this will not help with the many custom operators that are around. They
will simply not get updated as part of this exercise, leaving them as
technical debt. Thus we need an approach that works with the past and
improves the future. The generic pattern for Airflow operators is pretty
simple: you can read many (yes we know there are exceptions!) as
SourceToTarget(src_conn_id, src_xxx, src_xx, target_conn_id, target_xxx,
some_other_kwarg). Hence, we came up with the following:

For existing non lineage aware operators:

1. Use wrapper objects to group parameters together as inlet or as outlet.
For example usage for the MysqlToHiveTransfer could look like
MysqlToHiveTransfer(Inlet(mysql_conn_id=‘mysql_conn’, sql=’select * from
table’), Outlet(hive_cli_conn_id=‘hive_conn’, hive_table=‘my_hive_table’)).
The wrapper objects would then set the right kwargs to the Operator and
create the lineage information. This resolves the issue of keeping
parameters in sync.
2. Use the build pattern to tell the lineage system which arguments to the
operator are for the Inlet and for the Outlet. Maybe with a type hint if
required. E.g.
MysqlToHiveTransfer(mysql_conn_id=‘conn_id’, sql=’select * from table’,
hive_cli_conn_id=‘hive_conn’,
hive_table=‘hive_table’).inlet(‘mysql_conn_id’,{’sql’:
‘mysql’}).outlet(‘hive_cli_conn_id’, ‘hive_table’)
This requires a bit more work from the developer as the parameter names
need to be kept in sync. However, they are slow moving.

Future lineage aware operators:

1. Update the Operator to set and support inlets and outlets itself. E.g.
like the current PapermillOperator
2. Have a dictionary inside the operator which tells the lineage system
what fields are used for inlet and outlet. This is the integrated pattern
of 2 for non lineage aware operators:
    # dictionary of parameter name with type
    inlet_fields = {‘mysql_conn_id’: ‘mysql_connection’, ’sql’: ’sql’}
    outlet_fields = {‘hive_conn_id’: ‘hive_connection’, ’hive_table’’:
’table’}
Updates to the operator need to be checked to ensure the fields names are
kept in sync.
3. Enforce a naming pattern for Operators like
    MysqlToHiveTransfer(…) becomes
MysqlToHive(mysql_conn_id, mysql_sql, hive_conn_id, hive_table) or
    MysqlToHive(src_conn_id, src_sql, target_conn_id, target_table)
This would allow the lineage system to figure out what is inlet and what is
outlet based on the naming scheme. It would require pylint plugin to make
sure Operators to behave correctly, but would also make operators much more
predictable.

Option number 3 for the future has the most impact. Out of the box the
lineage system in Airflow can support (and its my intention to do so) all
the above patterns, but ideally we do improve the state so that we can
deprecate what we do for non lineage aware operators in the future: wrapper
objects and the build pattern wouldn’t be necessary anymore.

What do you think? What are your thoughts on lineage, what kind of usages
do you foresee? How would you like to be using it and have it supported in
Airflow? Would you be able to work with the above ways of doing it? Pros
and cons?

Thanks
Bolke

Reply via email to