Really like the proposal, updated AIP doc for now. Updates:
• Changed name (as pointed by Ash) • Updated example with new decorator interface • Updated description of decorator to match the new proposed interface • Added link to Airflow Slack thread into AIP for clarity Gerard Casas Saez Twitter | Cortex | @casassaez On Feb 26, 2020, 11:09 AM -0700, Ash Berlin-Taylor <a...@apache.org>, wrote: > Chatting with Gerard on slack a bit and we think a new name for this is > "Functional DAG definition" -- yes this is still an API, but it's too easily > confused with an HTTP/REST API. > > The page now lives at > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-31%3A+Airflow+functional+DAG+definition > Also chatting and we think we have a nicer name for the decorators, and two > options (Probably worth having both of them) > @dag.task and from airflow import task (this is somewhat mirroring Celery > design) > -example- > import json > from datetime import datetime, timedelta > > from airflow.operators.email_operator import EmailOperator > from airflow.operators.http_operator import SimpleHttpOperator > from airflow.operators import decorators > > from airflow import DAG > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'start_date': datetime(2019, 12, 27), > 'retries': 1, > 'retry_delay': timedelta(minutes=1), > } > > with DAG( > 'send_server_ip', default_args=default_args, schedule_interval=None > ) as dag: > > # Using default connection as it's set to httpbin.org (http://httpbin.org) by > default > get_ip = SimpleHttpOperator( > task_id='get_ip', endpoint='get', method='GET', xcom_push=True > ) > > @dag.task(provide_context=True) > def email_subject_generator(raw_json: str) -> str: > external_ip = json.loads(raw_json)['origin'] > return f"Server connected from {external_ip}" > > @dag.task > def email_body_generator(raw_json: str) -> str: > return """ > Seems like today your server executing Airflow is connected from the external > IP {origin}<br> > <br> > Have a nice day! > """.format(**json.loads(raw_json)) > > send_email = EmailOperator( > task_id='send_email', > to="exam...@example.com (http://example.com)", > subject='', > html_content='' > ) > > ip_info = get_ip() > subject = email_subject_generator(ip_info) > body = email_body_generator(ip_info) > send_email(subject=subject, html_content=body) > > -end-example- > With the dag.task decorator, the with dag context manager is not needed, and > the task is assigned to the specific dag. > The airflow.task decorator (lazily importing from airflow.decorators.task) > makes the function an operator, but doesn't automatically assign it to a dag, > unless you are already in a with dag: block. > What to people think of this tweaked interface? > -Ash > On Feb 25 2020, at 10:28 pm, Gerard Casas Saez > <gcasass...@twitter.com.INVALID> wrote: > > Short update on status after receiving some feedback on Slack and > > Confluence comments: > > > > • Updated description of PythonFunctionalOperator > > • Added a few clarifying comments on how each component change plays along > > each other > > > > Reached out to Bolke to see if I can get his thoughts on how Lineage and > > Functional DAG API overlap (not sure what’s the best format for that > > discussion, but will be updating this thread as I go along). > > Unless there’s someone strongly against with the current proposal, I plan > > on submitting the proposal for vote by mid-next week and start planning > > implementation tasks after that. Haven’t been able to find a good guideline > > for this process, mainly looking at other completed AIP. Let me know if I > > should wait more. > > Best, > > Gerard Casas Saez > > Twitter | Cortex | @casassaez > > On Feb 24, 2020, 10:31 AM -0700, Jarek Potiuk <jarek.pot...@polidea.com>, > > wrote: > > > Ah yeah... I totally forgot about that :) (shame on me) ... But it does > > > seem appropriate if I came to the same conclusion again looking from > > > another angle.... :D > > > > > > J. > > > > > > On Mon, Feb 24, 2020 at 6:25 PM Gerard Casas Saez > > > <gcasass...@twitter.com.invalid> wrote: > > > > > > > Agree, I initially pitched the idea on the lineage thread and was > > > > encouraged to pitch it separately. I would love to help figure out how > > > > to > > > > align this 2 projects better. > > > > > > > > Bolke - want to set up a call or how should we discuss this better? > > > > Would > > > > love to hear feedback on my proposal. > > > > > > > > Gerard Casas Saez > > > > Twitter | Cortex | @casassaez > > > > On Feb 23, 2020, 1:44 AM -0700, Jarek Potiuk <jarek.pot...@polidea.com>, > > > > wrote: > > > > > I like the idea a lot. Good direction. > > > > > > > > > > I know we have a few people who are better in functional thinking > > > > > than me > > > > > so I think I would love those people to work it out. Happy to listen > > > > > to > > > > > > > > the > > > > > discussions :) > > > > > > > > > > One thing that struck me however. I am not sure if that falls in the > > > > > same > > > > > camp, but I have a feeling that there is at least some common part > > > > > with > > > > > > > > the > > > > > proposal from Bolke about the Lineage. > > > > > Those two things (functional DAG API and Lineage) are not directly > > > > > connected but I think the design of both has at least some common part > > > > > > > > and > > > > > it would make sense that at least we talk about this and how they play > > > > > together. > > > > > > > > > > WDYT Bolke? > > > > > > > > > > J > > > > > On Fri, Feb 21, 2020 at 7:13 PM Dan Davydov > > > > > <ddavy...@twitter.com.invalid > > > > > wrote: > > > > > > Here is the link to the AIP for folk's convenience: > > > > > > > > > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-31%3A+Airflow+functional+DAG+API > > > > > > > > > > > > The proposal and this all looks really good to me :)! I do want to > > > > call out > > > > > > to others that it's important we get the interface 95%+ right from > > > > > > the > > > > > > get-go since it could be hard to change later once users start > > > > > > > > > > > > > depending on > > > > > > it. > > > > > > > > > > > > On Fri, Feb 21, 2020 at 12:07 PM Gerard Casas Saez > > > > > > <gcasass...@twitter.com.invalid> wrote: > > > > > > > > > > > > > Hi everyone, > > > > > > > Sending a new message to everyone to gather feedback on the AIP-31 > > > > about > > > > > > > Airflow functional DAG API. This was initially discussed and > > > > > > > > > > > > > > > > > > > proposed in > > > > > > > [DISCUSS] Airflow functional DAGs. After leaving open a small doc > > > > > > > to > > > > > > > iterate on the proposal for a couple weeks, I decided to move > > > > > > > > > > > > > > > > > > > forward to > > > > > > > formalize it as an AIP document in confluence. I will still be > > > > > > > > > > > > > > > > > > > tracking > > > > > > > comments on the doc but would rather move the conversation here. > > > > > > > > > > > > > > AIP - 31 focuses on solving the issue about implicit message > > > > > > > passing > > > > in > > > > > > > Airflow by extending the DAG/Operator API to include a way to set > > > > > > > up > > > > > > > > > > > > > > > > > > > a > > > > > > > message passing dependency on the same DAG file. This is > > > > > > > > > > > > > > > > > > > complimentary to > > > > > > > task dependency declaration and is intended to be used as another > > > > > > > > > > > > > > > > > > > option > > > > > > to > > > > > > > declare dependencies by declaring message dependencies. > > > > > > > > > > > > > > In addition, AIP-31 proposes a way to declare PythonOperators > > > > > > > from a > > > > > > > function using decorators. This should help embed custom behavior > > > > > > > > > > > > > > > > > > > into > > > > > > DAGs > > > > > > > without needing to create custom operators for everything. > > > > > > > > > > > > > > Changes proposed: > > > > > > > > > > > > > > • Add __call__ function in BaseOperator: Add a functional > > > > > > > interface > > > > to > > > > > > > replace class attributes on execution time. > > > > > > > • XComArg class: This object is a reference to an XCom value that > > > > > > > > > > > > > > > > > > > has not > > > > > > > been created and will need to be resolved in the future. > > > > > > > • PythonFunctionalOperator and Python Function Operator: Extend > > > > > > > PythonOperator to map op_args and op_kwargs from a decorated > > > > > > > > > > > > > > > > > > > interface > > > > > > for > > > > > > > easier set up and add a decorator to create PythonOperators from a > > > > > > > > > > > > function > > > > > > > in an easier way. > > > > > > > > > > > > > > > > > > > > > See AIP document for a DAG example. > > > > > > > Any help on how to proceed with this will be appreciated (also > > > > > > > joined > > > > > > > Slack and posted in. #airflow-creative). My guess is once we > > > > > > > agree on > > > > > > > > > > > > what > > > > > > > the API should look like, next step is to do a vote and if > > > > > > > > > > > > > > > > > > > successful to > > > > > > > create JIRA issues and GitHub PRs with the mentioned changes. > > > > > > > > > > > > > > Please let me know if there’s any aspect that people feel > > > > > > > strongly opinionated against or aspects that are not clear and I > > > > > > > > > > > > > > > > > > > should > > > > > > > work on expanding further. > > > > > > > > > > > > > > Best, > > > > > > > Gerard Casas Saez > > > > > > > Twitter | Cortex | @casassaez > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Jarek Potiuk > > > > > Polidea <https://www.polidea.com/> | Principal Software Engineer > > > > > > > > > > M: +48 660 796 129 <+48660796129> > > > > > [image: Polidea] <https://www.polidea.com/> > > > > > > > > > > > > > > > > > -- > > > Jarek Potiuk > > > Polidea <https://www.polidea.com/> | Principal Software Engineer > > > > > > M: +48 660 796 129 <+48660796129> > > > [image: Polidea] <https://www.polidea.com/> > > > > >