Arthur,

That's an excellent point.

I think it touches on some ambiguity in how we conceptualize these two
types of workflows (task-driven and data-driven) and so I think we should
develop a more concrete vocabulary for describing them. Here's my attempt.

We are all comfortable with a task-driven workflow; it is how Airflow
operates today. For completeness, though: tasks execute and are deemed
successful if they execute without raising an error. Otherwise, they are
considered failed. Tasks only execute when their upstream task dependencies
meet a set of criteria -- by default, all successful. Critically, tasks are
self-contained (meaning they are almost always atomic and often even
idempotent, though external side effects and XComs are allowed).

Data driven workflows, by contrast, make no claims about a task's state.
Instead, tasks consume or produce data and are chained to pass that data
along. To the extent it is required, a task can be considered successful if
it produces data and failed otherwise. A downstream task runs as soon as
upstream data is available. I think the most sophisticated examples are
Spark and Beam, which use distributed execution engines to efficiently
process data.

To what degree to these two workflows overlap? I think it's just a matter
of what each style makes more or less convenient. A task-driven workflow
can always mimic a data-driven workflow by embedding data serialization
boilerplate at the beginning and end of each task to mimic passing data
from task to task. However, it is difficult to define that logic outside
the simple linear case (task A -> B -> C -> ...), though tools like XComs
can facilitate it.

=========================

So with all that in mind, adding a data-driven approach to Airflow amounts
largely to saying that in addition to task B depending on task A completing
successfully, task B has an input parameter that is generated by task A.

1. Airflow's current task-driven workflow:
     Sensor --> DQ --> Operator

2. A new, completely data-driven workflow:
  Sensor (generates data) --> DQ (cleans the data) --> Operator (consumes
the data)

3. A hybrid approach:
    Sensor (generates data) -- [task dep] --> DQ -- [task dep.] --> Operator
                                  \------------- [ data dep ]
---------------------------  /
    In this approach, the Operator will only run when the DQ check is
successful AND it receives the data from the sensor.

To be honest, I don't see a strong reason not to use #1, the current
Airflow approach. As soon as data becomes conditional in some respect,
data-driven models break down because the conditional logic has to be
implemented along with the data. A task driven approach is more robust here.

However, a common case we have is to take a file from a remote location,
process it with one or more steps, and put it back in remote storage. I
would love to do this with at least 3 operators: one that uses a storage
hook to download the file, one (or more) that processes the file, and one
that uses a storage hook to reupload it. However, I can't do that because
there is no way to get the file passed down the dependency chain.
Consequently, we have many cases where what are logically 3 or more
operators get squeezed into a single massive operator (think:
BigQueryExtract_CleanData_ProcessData_UploadToGCS_Operator). Other times,
the operators remain distinct but each one has to bolt on storage hooks
(BigQueryToGCS -> DownloadGCS_CleanData_UploadGCS ->
DownloadGCS_ProcessData_UploadGCS), not to mention the complexity of
passing along the appropriate URIs via XCom.

This is a case where a data-driven workflow would really shine: a single
logical piece of data being used by multiple Operators in sequence. That's
true even if behind the scenes all it was doing was handling the remote
storage calls for me, which is how we've solved this problem by hand.

Lastly, it's possible to take this to far. A purely data-driven workflow,
involving mapping an operation over a collection, is best handled by
Spark/Beam or another dedicated data system. If Airflow moves in this
direction it is as a matter of convenience, recognizing that some atomic
operations do not just depend on upstream completion, but upstream results
as well.

-- J






On Wed, Jan 25, 2017 at 1:04 PM Arthur Wiedmer <arthur.wied...@gmail.com>
wrote:

>From our own data warehouse, there are definitely cases where knowing that
the data is there is not enough. While I agree that ideally the dependency
in data should be explicit, the current dependency engine allows you to
compress some of the data dependencies by using the task dependencies.


For instance, we sometimes use additional data quality checks before we
proceed :
Data Sensor ----> DQ check ----> operator

Rendering the dependency explicit would lead to this below :

Data ----> DQ check ----> operator
        \_________________/

This is not inherently bad, but I feel that the dependency is redundant. Of
course, the additional checks could be somehow encoded in the dependency,
but it does not feel as clean to me, especially if the data quality check
is resource intensive.

Here my dependency is not so much on the data being available as it is on
the data being of the quality I need.

Best,
Arthur

On Jan 25, 2017 7:09 AM, "Jeremiah Lowin" <jlo...@apache.org> wrote:

> At the simplest level, a data-dependency should just create an automatic
> task-dependency (since a task shouldn't run before required data is
> available). Therefore it should be possible to reason about dataflow using
> the existing dependency framework.
>
> Is there any reason that wouldn't hold for all dataflow scenarios?
>
> Then the only differentiation becomes whether a task-dependency was
defined
> explicitly by a user or implicitly by a data-dependency.
>
> On Tue, Jan 24, 2017 at 11:23 AM Maxime Beauchemin <
> maximebeauche...@gmail.com> wrote:
>
> > I'm happy working on a design doc. I don't think Sankeys are the way to
> go
> > as they are typically used to show some metric (say number of users
> flowing
> > through pages on a website), and even if we'd have something like row
> count
> > throughout I don't think we'd want to make it that centric to the
> > visualization.
> >
> > I think good old graphs are where it's at. Either overloading the
current
> > graph view with extra options (current view untouched, current view +
> > lineage (a graph where nodes are tasks or data objects,  data objects
> have
> > a different shape), lineage only view).
> >
> > On Mon, Jan 23, 2017 at 11:16 PM, Gerard Toonstra <gtoons...@gmail.com>
> > wrote:
> >
> > > data lineage is one of the things you mentioned in an early
> presentation
> > > and I was wondering about it.
> > >
> > > I wouldn't mind setting up an initial contribution towards achieving
> > that,
> > > but would like to understand
> > > the subject a bit better. The easiest MVP is to use the annotations
> > method
> > > to simply show how
> > > data flows, but you mention other things that need to be done in the
> > third
> > > paragraph. If a wiki could
> > > be written on the subject, explaining why those things are done, we
can
> > set
> > > up a discussion and
> > > create an epic with jira issues to realize that.
> > >
> > > The way I think this can be visualized is perhaps through a sankey
> > diagram,
> > > which helps to make
> > > complex systems more understandable, eg:
> > > - how is transaction margin calculated?  What is all the source data?
> > > - where does customer data go to and are those systems compliant?
> > > - what is the overall data dependency between systems and can these be
> > > reduced?
> > > - which data gets used everywhere?
> > > - which end systems consume from the most diverse sources of data?
> > >
> > > and other questions appropriate for data lineage.
> > >
> > > Rgds,
> > >
> > > Gerard
> > >
> > >
> > > On Tue, Jan 24, 2017 at 2:04 AM, Maxime Beauchemin <
> > > maximebeauche...@gmail.com> wrote:
> > >
> > > > A few other thoughts related to this. Early on in the project, I had
> > > > designed but never launched a feature called "data lineage
> annotations"
> > > > allowing people to define a list of sources, and a list of targets
> > > related
> > > > to a each task for documentation purposes. My idea was to use a
> simple
> > > > annotation string that would uniquely map to a data object. Perhaps
a
> > URI
> > > > as  in `{connection_type}://{conn_id}/{something_unique}` or
> something
> > > to
> > > > that effect.
> > > >
> > > > Note that operators could also "infer" lineage based on their input
> > > > (HiveOperator could introspect the HQL statement to figure out input
> > and
> > > > outputs for instance), and users could override the inferred lineage
> if
> > > so
> > > > desired, either to abstract complexity like temp tables and such, to
> > > > correct bad inference (SQL parsing is messy), or in cases where
> > operators
> > > > wouldn't implement the introspection functions.
> > > >
> > > > Throw a `data_object_exist(data_object_uri)` and a
> > > > `clear_data_object(data_object_uri)` method in existing hooks, and a
> > > > `BaseOperator.use_target_presence_as_state=False` boolean and some
> > > > handling
> > > > of in the dependency engine and while "clearing" and we're not too
> far
> > > from
> > > > a solution.
> > > >
> > > > As a more generic alternative, potentially task states could be
> handled
> > > by
> > > > a callback when so-desired. For this, all we'd need to do is to add
a
> > > > `status_callback(dag, task, task_instance)` callback to
BaseOperator,
> > and
> > > > evaluate it for state in place of the database state where user
> > specify.
> > > >
> > > > Max
> > > >
> > > > On Mon, Jan 23, 2017 at 12:23 PM, Maxime Beauchemin <
> > > > maximebeauche...@gmail.com> wrote:
> > > >
> > > > > Just commented on the blog post:
> > > > >
> > > > > ----------------------------
> > > > > I agree that workflow engines should expose a way to document data
> > > > objects
> > > > > it reads from and writes to, so that it can be aware of the full
> > graph
> > > of
> > > > > tasks and data objects and how it all relates. This metadata
allows
> > for
> > > > > clarity around data lineage and potentially deeper integration
with
> > > > > external systems.
> > > > > Now there's the question of whether the state of a workflow should
> be
> > > > > inferred based on the presence or absences of related targets. For
> > this
> > > > > specific question I'd argue that the workflow engine needs to
> manage
> > > its
> > > > > own state internally. Here are a few reasons why: * many
> maintenance
> > > > > tasks don't have have a physical output, forcing the creation of
> > dummy
> > > > > objects representing state * external systems have no guarantees
as
> > to
> > > > > how quickly you can check for the existence of an object,
therefore
> > > > > computing what task can run may put a burden on external systems,
> > > poking
> > > > at
> > > > > thousands of data targets (related: the snakebite lib was
developed
> > in
> > > > part
> > > > > to help with the Luigi burden on HDFS) * how do you handle the
> > > "currently
> > > > > running" state? a dummy/temporary output? manage this specific
> state
> > > > > internally? * how to handle a state like the "skipped" in Airflow
> > > > > (related to branching)? creating a dummy target? * if you need to
> > > re-run
> > > > > parts of the pipeline (say a specific task and everything
> downstream
> > > for
> > > > a
> > > > > specific date range), you'll need to go and alter/delete the
> presence
> > > of
> > > > a
> > > > > potentially intricate list of targets. This means the workflow
> engine
> > > > needs
> > > > > to be able to delete files in external systems as a way to re-run
> > > tasks.
> > > > > Note that you may not always want to take these targets offline
for
> > the
> > > > > duration of the backfill. * if some tasks are using staging or
> > > temporary
> > > > > tables, cleaning those up to regain space would re-trigger the
> task,
> > so
> > > > > you'll have to trick the system into achieving what you want to do
> > > > > (overwriting with an empty target?), perhaps changing your unit of
> > work
> > > > by
> > > > > creating larger tasks that include the temporary table step, but
> that
> > > may
> > > > > not be the unit-of-work that you want From my perspective, to run
a
> > > > > workflow engine at scale you need to manage its state internally
> > > because
> > > > > you need strong guarantees as to reading and altering that state.
I
> > > agree
> > > > > that ideally the workflow engine should know about input and
output
> > > data
> > > > > objects (this is not the case currently in Airflow), and it would
> be
> > a
> > > > real
> > > > > nice thing to be able to diff & sync state across its internal
> state
> > > and
> > > > > external one (presence of targets), but may be challenging.
> > > > >
> > > > > Max
> > > > >
> > > > > On Mon, Jan 23, 2017 at 8:05 AM, Bolke de Bruin <bdbr...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > >> Hi All,
> > > > >>
> > > > >> I came by a write up of some of the downsides in current workflow
> > > > >> management systems like Airflow and Luigi (
> > > > http://bionics.it/posts/workf
> > > > >> lows-dataflow-not-task-deps) where they argue dependencies should
> be
> > > > >> between inputs and outputs of tasks rather than between tasks
> > > > >> (inlets/outlets).
> > > > >>
> > > > >> They extended Luigi (https://github.com/pharmbio/sciluigi) to do
> > this
> > > > >> and even published a scientific paper on it:
> > > > >> http://jcheminf.springeropen.com/articles/10.1186/s13321-016
> -0179-6
> > .
> > > > >>
> > > > >> I kind of like the idea, has anyone played with it, any thoughts?
> I
> > > > might
> > > > >> want to try it in Airflow.
> > > > >>
> > > > >> Bolke
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to