I really agree with most of what was posted above but particularly love what Evgeny wrote about having a DAG API. As an end user, I would love to be able to provide different implementations of core DAG functionality, similar to how hExecutor can already be subclassed. Some key behavior points I either have personally had to override/work around, or would like to do if it were easier:
DAG schedule (currently defined by the DAG after it has been parsed) DAG discovery (currently always a scheduler subprocess) DAG state (currently stored directly in the DB) Loading content for discovered DAGs (currently always on-disk, causes versioning problems) Parsing content from discovered DAGs (currently always a scheduler subprocess, causes performance problems) Providing DAG result transfer/persistence (currently only XCOM, causes many problems) Breaking DAG functionality into a set of related APIs would allow Airflow to still have a good "out of the box" experience, or a simple git-based deployment mode; while unlocking a lot of capability for users with more sophisticated needs. For example, we're using Argo Workflows nowadays, which is more Kubernetes native but also more limited. I could easily envision a DAG implementation where Airflow stores historical executions; stores git commits and retrieves DAG definitions as needed; launches Argo Workflows to perform both DAG parsing _and_ task execution; and stores results in Airflow. This would turn Airflow into the system of record (permanent history), and Argo into the ephemeral execution layer (delete after a few days). End users could submit from Airflow even without Kubernetes access, and wouldn't need Kubernetes access to view results or logs. Another example: we have notebook users who often need to schedule pipelines, which they do in ad hoc ways. Instead they could import Airflow and define a DAG in Python, then call a command to remotely execute it. This would run on our "staging" Airflow cluster, with access to staging credentials, but as a DAG named something like "username.git_repo.notebook_name". Each run would freeze the current notebook (like to S3), execute it, and store results via papermill; this would let users launch multiple iterations of a notebook (like looping over a parameter list), run them for a while, and pick the one that does what they need. In general, there's been no end to the frustration of DAGs being tightly coupled to a specific on-disk layout, specific Python packaging, etc. and I'd love to be able to cleanly write alternative implementations. On Fri, Feb 21, 2020 at 4:44 PM Massy Bourennani <[email protected]> wrote: > Hi all, > > We are using Airflow to put batch ML models in production (only the > prediction is done). > [image: image.png] > > Above is an example of a DAG we are using to execute an ML model written > in Python (scikit learn) > > 1. At first, data is extracted from BigQuery. The SQL query is on a DS > owned GitHub repository > 2. Then, the trained model, which is also serialized in pickle, is > taken from Github > 3. After that, an Apache Beam job is started on Dataflow which is > responsible for reading the input files, downloading the model from GCS, > deserializing it, and using to predict the score of each data point. Below > you can see the expected interface of every model > 4. At the end, the results are saved in Bigquery/GCS > > class WhateverModel: def predict(batch: collections.abc.Iterable) -> > collections.abc.Iterable: """ :param batch: a collection of dicts :type > batch: collection(dict) :return: a collection of dicts. there should be a > score in one of the fields of every dict (every datapoint) """ pass > > key points: > * every input: SQL query used to extract the dataset, ML model, custom > packages used by the model (used to setup Dataflow workers) is in Github. > So we can go from one version to another and use it fairly easily, all we > need are some qualifiers: GitHub repo, path to file, and tag version. > * DS are free to use whatever python library they want thanks to Apache > Beam that provides a way to initialize workers with custom packages. > > weak points: > * Whenever DS update one of the input (SQL query, python packages, or > model) we, DE, need to update the DAG > * It's really specific to Batch Python ML models > > Below is a code snippet of a DAG instantiation > > with Py3BatchInferenceDAG(dag_id='mydags.execute_my_ml_model', > sql_repo_name='app/data-learning', > > sql_file_path='data_learning/my_ml_model/sql/predict.sql', > sql_tag_name='my_ml_model_0.0.12', > model_repo_name='app/data-models', > > model_file_path='repository/my_ml_model/4/model.pkl', > model_tag_name='my_ml_model_0.0.4', > python_packages_repo_name='app/data-learning', > python_packages_tag_name='my_ml_model_0.0.9', > > python_packages_paths=['data_learning/my_ml_model/python_packages/package_one/'], > params={ ########## setup of Dataflow workers > 'custom_commands': ['apt-get update', > 'gsutil cp -r > gs://bucket/package_one /tmp/', > 'pip install > /tmp/package_one/' > ], > 'required_packages': ['dill==0.2.9', 'numpy', > 'pandas', 'scikit-learn' > ,'google-cloud-storage'] > }, > external_dag_requires=[ > 'mydags.dependency_one$ds', > 'mydags.dependency_two$ds' > ], > > destination_project_dataset_table='mydags.my_ml_model${{ ds_nodash }}', > schema_path=Common.get_file_path(__file__, > "schema.yaml"), > start_date=datetime(2019, 12, 10), > schedule_interval=CronPresets.daily()) as dag: > pass > > I hope it helps, > Regards > Massy > > On Thu, Feb 20, 2020 at 8:36 AM Evgeny Shulman <[email protected]> > wrote: > >> Hey Everybody >> (Fully agreed on Dan's post. These are the main pain points we see/trying >> to fix. Here is our reply on the thread topic) >> >> We have numerous ML engineers that use our open source project (DBND) >> with Airflow for their everyday work. We help them create and monitor >> ML/DATA pipelines of different complexity levels and infra requirements. >> After 1.5 years doing that as a company now, and a few years doing it as a >> part of a big enterprise organization before we started Databand, these are >> the main pain points we think about when it comes to Airflow: >> >> A. DAG Versioning - ML teams change DAGs constantly. The first limitation >> they see is being able to review historical information of previous DAG >> runs based on the exact version of the DAG that executed. Our plugin >> 'dbnd-airflow-versioned-dag' is our approach to that. We save and show in >> the Airflow UI every specific version of the DAG. This is important in ML >> use cases because of the data science experimentation cycle and the need to >> trace exactly what code/data went into a model. >> >> B. A better version of the backfill command - We had to reimplement >> BackfillJob class to be able to run specific DAG versions. >> >> C. Running the same DAG in different environments - People want to run >> the same DAG locally and at GCP/AWS without changing all the code. We have >> done that by abstracting Spark/Python/Docker code execution so we can >> easily switch from one infra to another. We did that by wrapping all infra >> logic in a generic gateway "operators" with extensive use of existing >> Airflow hooks and operators. >> >> D. Data passing & versioning - being able to pass data from Operator to >> Operator, version the data. Being able to do that with easy authoring of >> DAGs & sub-DAGs - Pipelines grow in complexity very quickly. It will be >> hard to agree on what is the "right" SDK here to implement. Airflow is >> very "built by engineers for engineers", DAGs are created to be executed as >> Scheduled Production Jobs. It's going to be a long journey to get to the >> common conclusion on what's needs to be done on a higher level around >> task/data management. Some people from the airflow community went and >> started new Orchestration companies after they didn't manage to have a >> significant change in the Data model of Airflow. >> >> Our biggest wish list item in Airflow as advanced user: >> >> * A low-level API to generate and run DAGs *. >> So far there are numerous extensions, and all of them solve this by >> creating another dag.py file with the dag generation. But neither Scheduler >> nor UI can support that fully. The moment the scheduler together with UI >> will be open for "versioned DAGs", a lot of nice DSLs and extensions will >> emerge out of that. Data Analysts will get more GUI driven tools to >> generate DAGs, ML engineers will be able to run and iterate on their >> algorithms, Data engineers will be able to implement their DAG DSL/SDK the >> way they see it suits their company. >> >> Most users of DBND author their ML pipelines without knowing that Airflow >> is orchestrating behind the scenes. They submit Python/Spark/Notebooks >> without knowing that the DAG is going to be run through the Airflow >> subsystem. Only when they see the Airflow webserver they start to discover >> that there is Airflow. And this is the way it should be. ML developers >> don't like new frameworks, they just like to see data flowing from task to >> task, and ways to push work to production with minimal "external" code >> involved. >> >> Evgeny. >> >> On 2020/02/19 16:46:44, Dan Davydov <[email protected]> >> wrote: >> > Twitter uses Airflow primarily for ML, to create automated pipelines for >> > retraining data, but also for more ad-hoc training jobs. >> > >> > The biggest gaps are on the experimentation side. It takes too long for >> a >> > new user to set up and run a pipeline and then iterate on it. This >> problem >> > is a bit more unique to ML than other domains because 1) training jobs >> can >> > take a very long time to run, and 2) users have the need to launch >> multiple >> > experiments in parallel for the same model pipeline. >> > >> > Biggest Gaps: >> > - Too much boilerplate to write DAGs compared to Dagster/etc, and >> > difficulty in message passing (XCom). There was a proposal recently to >> > improve this in Airflow which should be entering AIP soon. >> > - Lack of pipeline isolation which hurts model experimentation (being >> able >> > to run a DAG, modify it, and run it again without affecting the previous >> > run), lack of isolation of DAGs from Airflow infrastructure (inability >> to >> > redeploy Airflow infra without also redeploying DAGs) also hurts. >> > - Lack of multi-tenancy; it's hard for customers to quickly launch an >> > ad-hoc pipeline, the overhead of setting up a cluster and all of its >> > dependencies is quite high >> > - Lack of integration with data visualization plugins (e.g. plugins for >> > rendering data related to a task when you click a task instance in the >> UI). >> > - Lack of simpler abstractions for users with limited knowledge of >> Airflow >> > or even python to build simple pipelines (not really an Airflow problem, >> > but rather the need for a good abstraction that sits on top of Airflow >> like >> > a drag-and-drop pipeline builder) >> > >> > FWIW my personal feeling is that a fair number companies in the ML space >> > are moving to alternate solutions like TFX Pipelines due to the focus >> these >> > platforms these have on ML (ML pipelines are first-class citizens), and >> > support from Google. Would be great if we could change that. The ML >> > orchestration/tooling space is definitely evolving very rapidly and >> there >> > are also new promising entrants as well. >> > >> > On Wed, Feb 19, 2020 at 10:56 AM Germain Tanguy >> > <[email protected]> wrote: >> > >> > > Hello Daniel, >> > > >> > > In my company we use airflow to update our ML models and to predict. >> > > >> > > As we use kubernetesOperator to trigger jobs, each ML DAG are similar >> and >> > > ML/Data science engineer can reuse a template and choose which type of >> > > machine they needs (highcpu, highmem, GPU or not..etc) >> > > >> > > We have a process in place describe in the second part of this article >> > > (Industrializing machine learning pipeline) : >> > > >> https://medium.com/dailymotion/collaboration-between-data-engineers-data-analysts-and-data-scientists-97c00ab1211f >> > > >> > > Hope this help. >> > > >> > > Germain. >> > > >> > > On 19/02/2020 16:42, "Daniel Imberman" <[email protected]> >> wrote: >> > > >> > > Hello everyone! >> > > >> > > I’m working on a few proposals to make Apache Airflow more >> friendly >> > > for ML/Data science use-cases, and I wanted to reach out in hopes of >> > > hearing from people that are using/wish to use Airflow for ML. If you >> have >> > > any opinions on the subject, I’d love to hear what you’re all working >> on! >> > > >> > > Current questions I’m looking into: >> > > >> > > 1. How do you use Airflow for your ML? Has it worked out well >> for you? >> > > 2. Are there any features that would improve your experience of >> > > building models on Airflow? >> > > 3. Have you built anything on top of airflow/around Airflow to >> aide >> > > you in this process? >> > > >> > > Thank you so much for your time! >> > > >> > > via Newton Mail [ >> > > >> https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcloudmagic.com%2Fk%2Fd%2Fmailapp%3Fct%3Ddx%26cv%3D10.0.32%26pv%3D10.14.6%26source%3Demail_footer_2&data=02%7C01%7Cgermain.tanguy%40dailymotion.com%7C2f6dfaee7bdf467a651108d7b552411d%7C37530da3f7a748f4ba462dc336d55387%7C0%7C0%7C637177237197962425&sdata=s4YovJSTKgLqi%2BAjRXfQFVntaPUyTO%2BTAlJnCIVygYE%3D&reserved=0 >> > > ] >> > > >> > > >> > >> >
