I really agree with most of what was posted above but particularly love
what Evgeny wrote about having a DAG API. As an end user, I would love to
be able to provide different implementations of core DAG functionality,
similar to how hExecutor can already be subclassed. Some key behavior
points I either have personally had to override/work around, or would like
to do if it were easier:

DAG schedule (currently defined by the DAG after it has been parsed)
DAG discovery (currently always a scheduler subprocess)
DAG state (currently stored directly in the DB)
Loading content for discovered DAGs (currently always on-disk, causes
versioning problems)
Parsing content from discovered DAGs (currently always a scheduler
subprocess, causes performance problems)
Providing DAG result transfer/persistence (currently only XCOM, causes many
problems)

Breaking DAG functionality into a set of related APIs would allow Airflow
to still have a good "out of the box" experience, or a simple git-based
deployment mode; while unlocking a lot of capability for users with more
sophisticated needs.

For example, we're using Argo Workflows nowadays, which is more Kubernetes
native but also more limited. I could easily envision a DAG implementation
where Airflow stores historical executions; stores git commits and
retrieves DAG definitions as needed; launches Argo Workflows to perform
both DAG parsing _and_ task execution; and stores results in Airflow. This
would turn Airflow into the system of record (permanent history), and Argo
into the ephemeral execution layer (delete after a few days). End users
could submit from Airflow even without Kubernetes access, and wouldn't need
Kubernetes access to view results or logs.

Another example: we have notebook users who often need to schedule
pipelines, which they do in ad hoc ways. Instead they could import Airflow
and define a DAG in Python, then call a command to remotely execute it.
This would run on our "staging" Airflow cluster, with access to staging
credentials, but as a DAG named something like
"username.git_repo.notebook_name". Each run would freeze the current
notebook (like to S3), execute it, and store results via papermill; this
would let users launch multiple iterations of a notebook (like looping over
a parameter list), run them for a while, and pick the one that does what
they need.

In general, there's been no end to the frustration of DAGs being tightly
coupled to a specific on-disk layout, specific Python packaging, etc. and
I'd love to be able to cleanly write alternative implementations.

On Fri, Feb 21, 2020 at 4:44 PM Massy Bourennani <[email protected]>
wrote:

> Hi all,
>
> We are using Airflow to put batch ML models in production (only the
> prediction is done).
> [image: image.png]
>
> Above is an example of a DAG we are using to execute an ML model written
> in Python (scikit learn)
>
>    1. At first, data is extracted from BigQuery. The SQL query is on a DS
>    owned GitHub repository
>    2. Then, the trained model, which is also serialized in pickle, is
>    taken from Github
>    3. After that, an Apache Beam job is started on Dataflow which is
>    responsible for reading the input files, downloading the model from GCS,
>    deserializing it, and using to predict the score of each data point. Below
>    you can see the expected interface of every model
>    4. At the end, the results are saved in Bigquery/GCS
>
> class WhateverModel: def predict(batch: collections.abc.Iterable) ->
> collections.abc.Iterable: """ :param batch: a collection of dicts :type
> batch: collection(dict) :return: a collection of dicts. there should be a
> score in one of the fields of every dict (every datapoint) """ pass
>
> key points:
> * every input: SQL query used to extract the dataset, ML model, custom
> packages used by the model (used to setup Dataflow workers) is in Github.
> So we can go from one version to another and use it fairly easily, all we
> need are some qualifiers: GitHub repo, path to file, and tag version.
> * DS are free to use whatever python library they want thanks to Apache
> Beam that provides a way to initialize workers with custom packages.
>
> weak points:
> * Whenever DS update one of the input (SQL query, python packages, or
> model) we, DE, need to update the DAG
> * It's really specific to Batch Python ML models
>
> Below is a code snippet of a DAG instantiation
>
> with Py3BatchInferenceDAG(dag_id='mydags.execute_my_ml_model',
>                           sql_repo_name='app/data-learning',
>                           
> sql_file_path='data_learning/my_ml_model/sql/predict.sql',
>                           sql_tag_name='my_ml_model_0.0.12',
>                           model_repo_name='app/data-models',
>                           
> model_file_path='repository/my_ml_model/4/model.pkl',
>                           model_tag_name='my_ml_model_0.0.4',
>                           python_packages_repo_name='app/data-learning',
>                           python_packages_tag_name='my_ml_model_0.0.9',
>                           
> python_packages_paths=['data_learning/my_ml_model/python_packages/package_one/'],
>                           params={ ########## setup of Dataflow workers
>                               'custom_commands': ['apt-get update',
>                                                   'gsutil cp -r 
> gs://bucket/package_one /tmp/',
>                                                   'pip install 
> /tmp/package_one/'
>                                                   ],
>                               'required_packages': ['dill==0.2.9', 'numpy', 
> 'pandas', 'scikit-learn'
>                                                    ,'google-cloud-storage']
>                           },
>                           external_dag_requires=[
>                               'mydags.dependency_one$ds',
>                               'mydags.dependency_two$ds'
>                           ],
>                           
> destination_project_dataset_table='mydags.my_ml_model${{ ds_nodash }}',
>                           schema_path=Common.get_file_path(__file__, 
> "schema.yaml"),
>                           start_date=datetime(2019, 12, 10),
>                           schedule_interval=CronPresets.daily()) as dag:
>     pass
>
> I hope it helps,
> Regards
> Massy
>
> On Thu, Feb 20, 2020 at 8:36 AM Evgeny Shulman <[email protected]>
> wrote:
>
>> Hey Everybody
>> (Fully agreed on Dan's post. These are the main pain points we see/trying
>> to fix. Here is our reply on the thread topic)
>>
>> We have numerous ML engineers that use our open source project (DBND)
>> with Airflow for their everyday work.  We help them create and monitor
>> ML/DATA pipelines of different complexity levels and infra requirements.
>> After 1.5 years doing that as a company now, and a few years doing it as a
>> part of a big enterprise organization before we started Databand, these are
>> the main pain points we think about when it comes to Airflow:
>>
>> A. DAG Versioning - ML teams change DAGs constantly. The first limitation
>> they see is being able to review historical information of previous DAG
>> runs based on the exact version of the DAG that executed. Our plugin
>> 'dbnd-airflow-versioned-dag'  is our approach to that. We save and show in
>> the Airflow UI every specific version of the DAG. This is important in ML
>> use cases because of the data science experimentation cycle and the need to
>> trace exactly what code/data went into a model.
>>
>> B. A better version of the backfill command - We  had to reimplement
>> BackfillJob class to be able to run specific DAG versions.
>>
>> C. Running the same DAG in different environments - People want to run
>> the same DAG locally and at GCP/AWS without changing all the code. We have
>> done that by abstracting Spark/Python/Docker code execution so we can
>> easily switch from one infra to another. We did that by wrapping all infra
>> logic in a generic gateway "operators" with extensive use of existing
>> Airflow hooks and operators.
>>
>> D. Data passing & versioning - being able to pass data from Operator to
>> Operator,  version the data. Being able to do that with easy authoring of
>> DAGs & sub-DAGs - Pipelines grow in complexity very quickly.  It will be
>> hard to agree on what is the "right" SDK here to implement.  Airflow is
>> very "built by engineers for engineers", DAGs are created to be executed as
>> Scheduled Production Jobs. It's going to be a long journey to get to the
>> common conclusion on what's needs to be done on a higher level around
>> task/data management. Some people from the airflow community went and
>> started new Orchestration companies after they didn't manage to have a
>> significant change in the Data model of Airflow.
>>
>> Our biggest wish list item in Airflow as advanced user:
>>
>> * A low-level API  to generate and run DAGs *.
>> So far there are numerous extensions, and all of them solve this by
>> creating another dag.py file with the dag generation. But neither Scheduler
>> nor UI can support that fully. The moment the scheduler together with UI
>> will be open for "versioned DAGs",  a lot of nice DSLs and extensions will
>> emerge out of that. Data Analysts will get more GUI driven tools to
>> generate DAGs, ML engineers will be able to run and iterate on their
>> algorithms, Data engineers will be able to implement their DAG DSL/SDK the
>> way they see it suits their company.
>>
>> Most users of DBND author their ML pipelines without knowing that Airflow
>> is orchestrating behind the scenes.  They submit Python/Spark/Notebooks
>> without knowing that the DAG is going to be run through the Airflow
>> subsystem. Only when they see the Airflow webserver they start to discover
>> that there is Airflow. And this is the way it should be. ML developers
>> don't like new frameworks, they just like to see data flowing from task to
>> task, and ways to push work to production with minimal "external" code
>> involved.
>>
>> Evgeny.
>>
>> On 2020/02/19 16:46:44, Dan Davydov <[email protected]>
>> wrote:
>> > Twitter uses Airflow primarily for ML, to create automated pipelines for
>> > retraining data, but also for more ad-hoc training jobs.
>> >
>> > The biggest gaps are on the experimentation side. It takes too long for
>> a
>> > new user to set up and run a pipeline and then iterate on it. This
>> problem
>> > is a bit more unique to ML than other domains because 1) training jobs
>> can
>> > take a very long time to run, and 2) users have the need to launch
>> multiple
>> > experiments in parallel for the same model pipeline.
>> >
>> > Biggest Gaps:
>> > - Too much boilerplate to write DAGs compared to Dagster/etc, and
>> > difficulty in message passing (XCom). There was a proposal recently to
>> > improve this in Airflow which should be entering AIP soon.
>> > - Lack of pipeline isolation which hurts model experimentation (being
>> able
>> > to run a DAG, modify it, and run it again without affecting the previous
>> > run), lack of isolation of DAGs from Airflow infrastructure (inability
>> to
>> > redeploy Airflow infra without also redeploying DAGs) also hurts.
>> > - Lack of multi-tenancy; it's hard for customers to quickly launch an
>> > ad-hoc pipeline, the overhead of setting up a cluster and all of its
>> > dependencies is quite high
>> > - Lack of integration with data visualization plugins (e.g. plugins for
>> > rendering data related to a task when you click a task instance in the
>> UI).
>> > - Lack of simpler abstractions for users with limited knowledge of
>> Airflow
>> > or even python to build simple pipelines (not really an Airflow problem,
>> > but rather the need for a good abstraction that sits on top of Airflow
>> like
>> > a drag-and-drop pipeline builder)
>> >
>> > FWIW my personal feeling is that a fair number companies in the ML space
>> > are moving to alternate solutions like TFX Pipelines due to the focus
>> these
>> > platforms these have on ML (ML pipelines are first-class citizens), and
>> > support from Google. Would be great if we could change that. The ML
>> > orchestration/tooling space is definitely evolving very rapidly and
>> there
>> > are also new promising entrants as well.
>> >
>> > On Wed, Feb 19, 2020 at 10:56 AM Germain Tanguy
>> > <[email protected]> wrote:
>> >
>> > > Hello Daniel,
>> > >
>> > > In my company we use airflow to update our ML models and to predict.
>> > >
>> > > As we use kubernetesOperator to trigger jobs, each ML DAG are similar
>> and
>> > > ML/Data science engineer can reuse a template and choose which type of
>> > > machine they needs (highcpu, highmem, GPU or not..etc)
>> > >
>> > > We have a process in place describe in the second part of this article
>> > > (Industrializing machine learning pipeline) :
>> > >
>> https://medium.com/dailymotion/collaboration-between-data-engineers-data-analysts-and-data-scientists-97c00ab1211f
>> > >
>> > > Hope this help.
>> > >
>> > > Germain.
>> > >
>> > > On 19/02/2020 16:42, "Daniel Imberman" <[email protected]>
>> wrote:
>> > >
>> > >     Hello everyone!
>> > >
>> > >     I’m working on a few proposals to make Apache Airflow more
>> friendly
>> > > for ML/Data science use-cases, and I wanted to reach out in hopes of
>> > > hearing from people that are using/wish to use Airflow for ML. If you
>> have
>> > > any opinions on the subject, I’d love to hear what you’re all working
>> on!
>> > >
>> > >     Current questions I’m looking into:
>> > >
>> > >      1. How do you use Airflow for your ML? Has it worked out well
>> for you?
>> > >      2. Are there any features that would improve your experience of
>> > > building models on Airflow?
>> > >      3. Have you built anything on top of airflow/around Airflow to
>> aide
>> > > you in this process?
>> > >
>> > >     Thank you so much for your time!
>> > >
>> > >     via Newton Mail [
>> > >
>> https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcloudmagic.com%2Fk%2Fd%2Fmailapp%3Fct%3Ddx%26cv%3D10.0.32%26pv%3D10.14.6%26source%3Demail_footer_2&data=02%7C01%7Cgermain.tanguy%40dailymotion.com%7C2f6dfaee7bdf467a651108d7b552411d%7C37530da3f7a748f4ba462dc336d55387%7C0%7C0%7C637177237197962425&sdata=s4YovJSTKgLqi%2BAjRXfQFVntaPUyTO%2BTAlJnCIVygYE%3D&reserved=0
>> > > ]
>> > >
>> > >
>> >
>>
>

Reply via email to