TL;DR

I am sharing my thoughts here about supporting in-memory data passing. Not
necessarily directly linked to airflow specific implementation, but airflow
still does play its role, i.e, chaining computational nodes.

In the data-flow graph parlance, airflow operators are
fundamentally computational nodes. edges connect the nodes. In the context
of airdlow, as per my understanding, they are purely symbolic at this time
(meaning, no explicit data flow happens), X-COMs not withstanding. As it
stands out, airflow does not take any opinion (rightly so) on how this data
flow is happening. Every node reads/writes the data into some persistent/
transient location, and downstream nodes shall have access to it. A
symbolic DAG is created, and underneath runtimes execute the computational
nodes. airflow, at this time, does not have the capability to handle data
in-memory between/ among nodes, as it is simply a symbolic DAG.

Extending airflow to handle in-mem data flow can be done via specifying an
attribute on the edge (for example, in-mem is property of the edge
connecting two nodes). airflow DAG is still a symbolic DAG except that
dependencies have additional attributes. airflow can let  the underneath
runtime engine attempt to full-fill this promise.
There can be at least two ways:
1) airflow DAG has this specical edges, I can bring up a Spark Cluster via
Apache Livy as the bridge (can run Python or Scala code at this time). Data
is referencible via name spaces. That is, data is held in memory in a
session, and airflow operators are invoked in the remote Spark cluster.
This was my plan to do doing Data Science declaratively, and even specify
which data-flows should be happen in-memory. In this sample
<https://github.com/dhavala/dagger/blob/master/src/dagger/examples/SampleDag_02.yaml>
DAG spec, I used unix-like pipes to denote in-memory edges, and normal
airflow ">" to denote non-opiniated data flows.
2) DASK <https://dask.org/> (for python) does in-memory processing. So any
subdags having in-memory edges, can be handled by a DASK operator. It is
upto DASK to full in-mem processing.
So bottomline, airflow will not have any opinion on how data flow happens.
A in-mem edge is only a future promise. If there is a supporting super
operator (like Livy with Spark) or DASK, it can get full filled.

thanks,
-soma





On Tue, Nov 26, 2019 at 9:37 PM James Meickle
<jmeic...@quantopian.com.invalid> wrote:

> I think this idea is running before we can even crawl. Before it makes any
> sense to implement this in Airflow, I think it needs three other things:
>
> - A reliable, well-designed component for passing data between tasks first
> (not XCom!); where shared memory is an _implementation_ of data passing
> - An understanding of temporary resources (not encoded as explicit DAG
> steps but stood up/torn down implicitly); where the shared memory _is_ a
> temporary resource
> - An understanding of cooperative scheduling and retrying (what if one half
> fails but the other half is still running?); where this is required to use
> shared memory safely without subtle race conditions
>
> And as stated, this is easy-ish on local executor and crushingly hard with
> anything else. Yet in the cases where you need this, you... probably don't
> want to be running on local executor.
>
> On Tue, Nov 26, 2019 at 6:22 AM Jarek Potiuk <jarek.pot...@polidea.com>
> wrote:
>
> > *TL;DR; Discuss whether shared memory data sharing for some tasks is an
> > interesting feature for future Airflow.*
> >
> > I had a few discussions recently with several Airflow users (including at
> > Slack [1] and in person at Warsaw Airflow meetup) about using shared
> memory
> > for inter-task communication.
> >
> > Airflow is not currently good for such case. It sounds doable, but fairly
> > complex to implement (and modifies Airflow paradigm a bit). I am not 100%
> > sure if it's a good idea to have such feature in the future.
> >
> > I see the need for it and I like it, however I would love to ask you for
> > opinions.
> >
> > *Context*
> >
> > The case is to have several independent tasks using a lot of temporary
> data
> > in memory. They either run in parallel and share loaded data, or use
> shared
> > memory to pass results between tasks. Examples: machine learning (like
> > audio processing). It makes sense to only load the audio files once (to
> > memory) and run several tasks on those loaded data.
> >
> > Best way to achieve it now is to combine such sharing-memory tasks into
> > single operator (Docker-compose for example ?) and run them as a single
> > Airflow Task. But maybe those tasks could still be modelled as separate
> > tasks in Airflow DAG. One benefit is that there might be different
> > dependencies for different tasks, processing results from some tasks
> could
> > be sent independently using different - existing - operators.
> >
> > As a workaround - we can play with queues and have one dedicated machine
> to
> > run all such tasks, but it has multiple limitations.
> >
> > *High-level idea*
> >
> > High level  it would require defining some affinity between tasks to make
> > sure that:
> >
> > 1) they are all executed on the same worker machine
> > 2) the processes should remain in-memory until all tasks finish for data
> > sharing (even if there is a dependency between the tasks)
> > 3) back-filling should act on the whole group of such tasks as "single
> > unit".
> >
> > I would love to hear your feedback.
> >
> > J
> >
> >
> > [1] Slack discussion on shared memory:
> > https://apache-airflow.slack.com/archives/CCR6P6JRL/p1574745209437200
> >
> > J.
> > --
> >
> > Jarek Potiuk
> > Polidea <https://www.polidea.com/> | Principal Software Engineer
> >
> > M: +48 660 796 129 <+48660796129>
> > [image: Polidea] <https://www.polidea.com/>
> >
>

Reply via email to