Hi everyone!

I think the whole data lineage proposal is great and I would like to contribute 
a bit with my own thoughts  on how to extend the Operators API for better 
lineage support.

Lately, I’ve been experimenting a bit on extending the Operator API to make it 
more `functional` to specify Data dependencies and pipeline data across the 
DAG. My approach is backwards compatible and it separates the way you specify 
operator arguments with Inlets/Outlets dynamically generated. I used XCom as a 
simplification to pass around dynamic values.

My proposal is to include a __call__ function that would dynamically replace 
class attributes before executing the `pre_execute` and `execute` function. 
This tied with a XComArg, a class that points to a previous task XCom pushed 
value, allowed me to define DAGs in a more functional approach. Basically my 
proposal is:


• Add a __call__ function in BaseOperator that accepts Inlets (in my case its 
XComArgs)
• Log their values on execution time (which would allow to expose a REST API 
like proposed before)
• Resolves them before executing the main `execute` function
• Set attribute in the operator class
• Executes the operator and returns an XComArgs that can later be tied in a new 
operator as an Inlet…


Here’s what it would look like (ML example, sorry):

with DAG(...) as dag:
 load = LoadDatasetOperator(task_id='load_dataset', )
 split = SplitTrainTestOperator(task_id='split', test_perc=0.3)
 train = TrainTensorflowModelOperator(task_id='train')
 validate = PrecisionRecallOperator(task_id='pr')
 report = EmailOperator(task_id='send_pr_report', subject='New model trained 
results', email='[email protected]’)

 dataset = load(path='hdfs://some/dataset')
 splitted_ds = split(dataset=dataset)
 model = train(dataset=splitted_ds['train'], 
model_specification='hdfs://some/dataset')
 metrics = validate(model=model, dataset=splitted_ds['test'])
 report(html_content=metrics)

As someone wise sometime said, code is better than words, so here’s my 
experimental code: https://github.com/casassg/corrent (ignore the awful name 
and the injection part).

Gerard Casas Saez
Twitter | Cortex | @casassaez
On Jan 22, 2020, 8:40 PM -0700, Tao Feng , wrote:
> Thanks Bolke. For those that are not aware, my team is working with Bolke's
> team on Amundsen which is a data discovery and metadata project(
> https://github.com/lyft/amundsen) . I think although it ships with Atlas
> client(or it used to be), the new API per my understanding is generic
> enough that doesn't tight with atlas. E.g we(Lyft) could build a neo4j /
> Amundsen client in our Airflow fork to ingest the lineage info in a push
> fashion to build the lineage.
>
> Amundsen itself has put up the effort to integrate Airflow with the
> tool(connect which DAG/task produces the data set etc). With this change, I
> foresee it will help to provide more enriched metadata.
>
> Thanks,
> -Tao
>
> On Wed, Jan 22, 2020 at 8:46 AM Dan Davydov <[email protected]>
> wrote:
>
> > Just want to preface my reply with the fact that I haven't thought about
> > data lineage very much.
> >
> > This is an awesome idea :)! I like something like 1) personally, e.g.
> > operators could optionally define a .outlet() and .inlet() interface which
> > would return the inlets and outlets of a given task, and then it's up to
> > the operator how it wants to set these inlets/outlets like the Papermill
> > operator currently does. This also keeps allows inlets/outlets more dynamic
> > (e.g. in the case of an operator that might generate inlets/outlets
> > dynamically at execution time). Seems the most extensible/least coupling.
> > IMO we should strive to make DAGs easy to create with little boilerplate,
> > but this is a lot less important for operators since they are a lot more
> > stable and change less frequently, so it's fine to require operators to
> > implement some interface manually.
> >
> > On Wed, Jan 22, 2020 at 8:33 AM Bolke de Bruin <[email protected]> wrote:
> >
> > > Dear All,
> > >
> > > Over last few weeks I made serious improvements to the lineage support
> > that
> > > Airflow has. Whilst not complete it’s starting to shape up and I think it
> > > is good to share some thoughts and directions. Much has been discussed
> > with
> > > several organisations like Polidea, Daily Motion and Lyft. Some have
> > > already implemented some support for lineage themselves (Daily Motion)
> > and
> > > some have a need for it (Lyft with Amundsen).
> > >
> > > First a bit of a recap. What is lineage of why is it important? Lineage
> > > allows you to track the origins of data what happens to it and where it
> > > moves over time. Lineage is often associated with audibility of data
> > > pipelines which is not a very sexy subject ;-). However, there are much
> > > more prominent and user facing improvements possible if you have lineage
> > > data available. Lineage greatly simplifies the ability to trace back
> > errors
> > > to the root cause in analytics. So, instead of the user calling up the
> > > engineering team in case of a data error, it could traceback to the
> > origin
> > > of the data and call the one that has created the original data set.
> > > Lineage also greatly improves discoverability of data. Lineage
> > information
> > > gives insights into the importance of data sets. So if a new employee
> > joins
> > > a team he would normally go to the most senior person in that team to ask
> > > him what data sources he is using and what their meaning is. If lineage
> > > information is exposed through a tool like Amundsen this is not required
> > > because that person can just look it up.
> > >
> > > To summarise their are 3 use cases driving the need for lineage:
> > >
> > > 1. Discoverability of data
> > > 2. Improved data operations
> > > 3. Audibility of data pipelines
> > >
> > > So that’s all great I hear you thinking, but why don’t we have it in
> > > Airflow already if it is so important? The answer to that is two fold.
> > > Firstly, adding lineage information is often associated with a lot of
> > > metadata and meta programming. Typically if lineage is being ’slapped on’
> > > one needs to add a lot of metadata which then need to be kept in sync. In
> > > that way it does not solve a problem for the developer and rather it
> > > creates one. Secondly, Airflow is a task based system and by definition
> > > does not have a very good infrastructure that deals with data. In the
> > past
> > > we had some trials by Jeremiah to add Pipelines, but it never was
> > > integrated and I think it actually sparked him to start Prefect ;-)
> > > (correct me if I am wrong if you are reading this Jermiah).
> > >
> > > Where is lineage support now in Airflow? In the 1.10.X series there is
> > some
> > > support for lineage, but it is buggy and difficult to use as it is based
> > on
> > > the metadata model of Apache Atlas. In master the foundation has much
> > > improved (but fully done yet). You can now set inlets and outlets with
> > > lightweight objects like File(url=“http://www.google.com”) and
> > > Table(name=“my_table”) and the lineage system in Airflow will figure out
> > a
> > > lot for you. You can also have inlets pick up outlets from previous
> > > upstream tasks by passing a list of task_ids or even using “AUTO” which
> > > picks up outlets from direct upstream tasks.
> > >
> > > The lightweight objects are automatically templated so you can do
> > something
> > > like File(url=“/tmp/my_data_{{ execution_date }}”) which does the right
> > > thing for you. Templating inlets and outlets gives very powerful
> > > capabilities by for example creating a Task, that, based on the inlets it
> > > receives, can drop PII information from an arbitrary table and output
> > this
> > > table somewhere else. This allows for creating Generic Tasks/Dags that
> > can
> > > be re-used without any domain knowledge. A small example (not PII) is
> > > available with the example_papermill_operator.
> > >
> > > Lineage information is exposed through an API endpoint. You can query
> > > “/api/experimental/lineage/<dag_id>/<execution_date>” and you will get a
> > > list of tasks with their inlets and outlets defined. The lineage
> > > information shared through the API and the lightweight object model are
> > > very close to the model used within Lyft’s Amundsen so when that gets
> > > proper visualisation support for lineage and pulls in the information
> > from
> > > Airflow it’s presto! Other systems might require some translation but
> > that
> > > shouldn’t be too hard.
> > >
> > > What doesn’t it do? Well, and here we get to the point of this
> > discussion,
> > > there is still meta programming involved to keep the normal parameters
> > and
> > > the inlets and outlets to an operator in sync. This is because it’s hard
> > to
> > > make operators lineage aware without changing them. So while you set
> > > “inlets” and “outlets” to an Operator the operator itself doesn’t do
> > > anything with them, making them a lot less powerful. Actually, there is
> > > only one operator that has out of the box support for lineage is the
> > > PapermillOperator.
> > >
> > > In discussions with the aforementioned organisations it became clear
> > that,
> > > while we could change all operators that Airflow comes out of the box
> > with,
> > > this will not help with the many custom operators that are around. They
> > > will simply not get updated as part of this exercise, leaving them as
> > > technical debt. Thus we need an approach that works with the past and
> > > improves the future. The generic pattern for Airflow operators is pretty
> > > simple: you can read many (yes we know there are exceptions!) as
> > > SourceToTarget(src_conn_id, src_xxx, src_xx, target_conn_id, target_xxx,
> > > some_other_kwarg). Hence, we came up with the following:
> > >
> > > For existing non lineage aware operators:
> > >
> > > 1. Use wrapper objects to group parameters together as inlet or as
> > outlet.
> > > For example usage for the MysqlToHiveTransfer could look like
> > > MysqlToHiveTransfer(Inlet(mysql_conn_id=‘mysql_conn’, sql=’select * from
> > > table’), Outlet(hive_cli_conn_id=‘hive_conn’,
> > hive_table=‘my_hive_table’)).
> > > The wrapper objects would then set the right kwargs to the Operator and
> > > create the lineage information. This resolves the issue of keeping
> > > parameters in sync.
> > > 2. Use the build pattern to tell the lineage system which arguments to
> > the
> > > operator are for the Inlet and for the Outlet. Maybe with a type hint if
> > > required. E.g.
> > > MysqlToHiveTransfer(mysql_conn_id=‘conn_id’, sql=’select * from table’,
> > > hive_cli_conn_id=‘hive_conn’,
> > > hive_table=‘hive_table’).inlet(‘mysql_conn_id’,{’sql’:
> > > ‘mysql’}).outlet(‘hive_cli_conn_id’, ‘hive_table’)
> > > This requires a bit more work from the developer as the parameter names
> > > need to be kept in sync. However, they are slow moving.
> > >
> > > Future lineage aware operators:
> > >
> > > 1. Update the Operator to set and support inlets and outlets itself. E.g.
> > > like the current PapermillOperator
> > > 2. Have a dictionary inside the operator which tells the lineage system
> > > what fields are used for inlet and outlet. This is the integrated pattern
> > > of 2 for non lineage aware operators:
> > > # dictionary of parameter name with type
> > > inlet_fields = {‘mysql_conn_id’: ‘mysql_connection’, ’sql’: ’sql’}
> > > outlet_fields = {‘hive_conn_id’: ‘hive_connection’, ’hive_table’’:
> > > ’table’}
> > > Updates to the operator need to be checked to ensure the fields names are
> > > kept in sync.
> > > 3. Enforce a naming pattern for Operators like
> > > MysqlToHiveTransfer(…) becomes
> > > MysqlToHive(mysql_conn_id, mysql_sql, hive_conn_id, hive_table) or
> > > MysqlToHive(src_conn_id, src_sql, target_conn_id, target_table)
> > > This would allow the lineage system to figure out what is inlet and what
> > is
> > > outlet based on the naming scheme. It would require pylint plugin to make
> > > sure Operators to behave correctly, but would also make operators much
> > more
> > > predictable.
> > >
> > > Option number 3 for the future has the most impact. Out of the box the
> > > lineage system in Airflow can support (and its my intention to do so) all
> > > the above patterns, but ideally we do improve the state so that we can
> > > deprecate what we do for non lineage aware operators in the future:
> > wrapper
> > > objects and the build pattern wouldn’t be necessary anymore.
> > >
> > > What do you think? What are your thoughts on lineage, what kind of usages
> > > do you foresee? How would you like to be using it and have it supported
> > in
> > > Airflow? Would you be able to work with the above ways of doing it? Pros
> > > and cons?
> > >
> > > Thanks
> > > Bolke
> > >
> >

Reply via email to