Dear All, Over last few weeks I made serious improvements to the lineage support that Airflow has. Whilst not complete it’s starting to shape up and I think it is good to share some thoughts and directions. Much has been discussed with several organisations like Polidea, Daily Motion and Lyft. Some have already implemented some support for lineage themselves (Daily Motion) and some have a need for it (Lyft with Amundsen).
First a bit of a recap. What is lineage of why is it important? Lineage allows you to track the origins of data what happens to it and where it moves over time. Lineage is often associated with audibility of data pipelines which is not a very sexy subject ;-). However, there are much more prominent and user facing improvements possible if you have lineage data available. Lineage greatly simplifies the ability to trace back errors to the root cause in analytics. So, instead of the user calling up the engineering team in case of a data error, it could traceback to the origin of the data and call the one that has created the original data set. Lineage also greatly improves discoverability of data. Lineage information gives insights into the importance of data sets. So if a new employee joins a team he would normally go to the most senior person in that team to ask him what data sources he is using and what their meaning is. If lineage information is exposed through a tool like Amundsen this is not required because that person can just look it up. To summarise their are 3 use cases driving the need for lineage: 1. Discoverability of data 2. Improved data operations 3. Audibility of data pipelines So that’s all great I hear you thinking, but why don’t we have it in Airflow already if it is so important? The answer to that is two fold. Firstly, adding lineage information is often associated with a lot of metadata and meta programming. Typically if lineage is being ’slapped on’ one needs to add a lot of metadata which then need to be kept in sync. In that way it does not solve a problem for the developer and rather it creates one. Secondly, Airflow is a task based system and by definition does not have a very good infrastructure that deals with data. In the past we had some trials by Jeremiah to add Pipelines, but it never was integrated and I think it actually sparked him to start Prefect ;-) (correct me if I am wrong if you are reading this Jermiah). Where is lineage support now in Airflow? In the 1.10.X series there is some support for lineage, but it is buggy and difficult to use as it is based on the metadata model of Apache Atlas. In master the foundation has much improved (but fully done yet). You can now set inlets and outlets with lightweight objects like File(url=“http://www.google.com”) and Table(name=“my_table”) and the lineage system in Airflow will figure out a lot for you. You can also have inlets pick up outlets from previous upstream tasks by passing a list of task_ids or even using “AUTO” which picks up outlets from direct upstream tasks. The lightweight objects are automatically templated so you can do something like File(url=“/tmp/my_data_{{ execution_date }}”) which does the right thing for you. Templating inlets and outlets gives very powerful capabilities by for example creating a Task, that, based on the inlets it receives, can drop PII information from an arbitrary table and output this table somewhere else. This allows for creating Generic Tasks/Dags that can be re-used without any domain knowledge. A small example (not PII) is available with the example_papermill_operator. Lineage information is exposed through an API endpoint. You can query “/api/experimental/lineage/<dag_id>/<execution_date>” and you will get a list of tasks with their inlets and outlets defined. The lineage information shared through the API and the lightweight object model are very close to the model used within Lyft’s Amundsen so when that gets proper visualisation support for lineage and pulls in the information from Airflow it’s presto! Other systems might require some translation but that shouldn’t be too hard. What doesn’t it do? Well, and here we get to the point of this discussion, there is still meta programming involved to keep the normal parameters and the inlets and outlets to an operator in sync. This is because it’s hard to make operators lineage aware without changing them. So while you set “inlets” and “outlets” to an Operator the operator itself doesn’t do anything with them, making them a lot less powerful. Actually, there is only one operator that has out of the box support for lineage is the PapermillOperator. In discussions with the aforementioned organisations it became clear that, while we could change all operators that Airflow comes out of the box with, this will not help with the many custom operators that are around. They will simply not get updated as part of this exercise, leaving them as technical debt. Thus we need an approach that works with the past and improves the future. The generic pattern for Airflow operators is pretty simple: you can read many (yes we know there are exceptions!) as SourceToTarget(src_conn_id, src_xxx, src_xx, target_conn_id, target_xxx, some_other_kwarg). Hence, we came up with the following: For existing non lineage aware operators: 1. Use wrapper objects to group parameters together as inlet or as outlet. For example usage for the MysqlToHiveTransfer could look like MysqlToHiveTransfer(Inlet(mysql_conn_id=‘mysql_conn’, sql=’select * from table’), Outlet(hive_cli_conn_id=‘hive_conn’, hive_table=‘my_hive_table’)). The wrapper objects would then set the right kwargs to the Operator and create the lineage information. This resolves the issue of keeping parameters in sync. 2. Use the build pattern to tell the lineage system which arguments to the operator are for the Inlet and for the Outlet. Maybe with a type hint if required. E.g. MysqlToHiveTransfer(mysql_conn_id=‘conn_id’, sql=’select * from table’, hive_cli_conn_id=‘hive_conn’, hive_table=‘hive_table’).inlet(‘mysql_conn_id’,{’sql’: ‘mysql’}).outlet(‘hive_cli_conn_id’, ‘hive_table’) This requires a bit more work from the developer as the parameter names need to be kept in sync. However, they are slow moving. Future lineage aware operators: 1. Update the Operator to set and support inlets and outlets itself. E.g. like the current PapermillOperator 2. Have a dictionary inside the operator which tells the lineage system what fields are used for inlet and outlet. This is the integrated pattern of 2 for non lineage aware operators: # dictionary of parameter name with type inlet_fields = {‘mysql_conn_id’: ‘mysql_connection’, ’sql’: ’sql’} outlet_fields = {‘hive_conn_id’: ‘hive_connection’, ’hive_table’’: ’table’} Updates to the operator need to be checked to ensure the fields names are kept in sync. 3. Enforce a naming pattern for Operators like MysqlToHiveTransfer(…) becomes MysqlToHive(mysql_conn_id, mysql_sql, hive_conn_id, hive_table) or MysqlToHive(src_conn_id, src_sql, target_conn_id, target_table) This would allow the lineage system to figure out what is inlet and what is outlet based on the naming scheme. It would require pylint plugin to make sure Operators to behave correctly, but would also make operators much more predictable. Option number 3 for the future has the most impact. Out of the box the lineage system in Airflow can support (and its my intention to do so) all the above patterns, but ideally we do improve the state so that we can deprecate what we do for non lineage aware operators in the future: wrapper objects and the build pattern wouldn’t be necessary anymore. What do you think? What are your thoughts on lineage, what kind of usages do you foresee? How would you like to be using it and have it supported in Airflow? Would you be able to work with the above ways of doing it? Pros and cons? Thanks Bolke