casassg commented on a change in pull request #8962: URL: https://github.com/apache/airflow/pull/8962#discussion_r441013469
########## File path: docs/concepts.rst ########## @@ -116,6 +116,47 @@ DAGs can be used as context managers to automatically assign new operators to th op.dag is dag # True +.. _concepts:functional_dags: + +Functional DAGs +--------------- +*Added in Airflow 1.10.11* + +DAGs can be defined using functional abstractions. Outputs and inputs are sent between tasks using +:ref:`XComs <concepts:xcom>` values. In addition, you can wrap functions as tasks using the +:ref:`task decorator <concepts:task_decorator>`. Dependencies are automatically inferred from +the message dependencies. + +Example DAG with functional abstraction + +.. code:: python + + with DAG( + 'send_server_ip', default_args=default_args, schedule_interval=None + ) as dag: + + # Using default connection as it's set to httpbin.org by default + get_ip = SimpleHttpOperator( + task_id='get_ip', endpoint='get', method='GET', xcom_push=True + ) + + @dag.task(multiple_outputs=True) + def prepare_email(raw_json: str) -> str: + external_ip = json.loads(raw_json)['origin'] + return { + 'subject':f'Server connected from {external_ip}', + 'body': f'Seems like today your server executing Airflow is connected from the external IP {external_ip}<br>' + } + + email_info = prepare_email(get_ip.output) Review comment: The `output` part was implemented already as part of a previous PR. This PR focuses on `@task` only. The main difference between the proposed approach vs current approach is that in the current approach we do not need to `call` normal operators. This should reduce complexity on creating functional DAGs. In addition, existing operators may have templated fields that are mandatory (example `subject` is required in `EmailOperator` initialization). This basically means that we need to overwrite them on `call` which is a bit confusing (see example below or example on AIP) The `output` is basically a way to access XComArg for non callable operators. This allows a more smooth definition of functional DAGs between normal operators and `@task` operators. Old example: ```python get_ip = SimpleHttpOperator( task_id='get_ip', endpoint='get', method='GET', xcom_push=True ) @dag.task(multiple_outputs=True) def prepare_email(raw_json: str) -> Dict[str, str]: external_ip = json.loads(raw_json)['origin'] return { 'subject':f'Server connected from {external_ip}', 'body': f'Seems like today your server executing Airflow is connected from the external IP {external_ip}<br>' } server_info = get_ip() email_info = prepare_email(server_info) send_email = EmailOperator( task_id='send_email', to='exam...@example.com', subject='', html_content='' ) send_email(subject=email_info['subject'], html_content=email_info['body']) ``` New approach: ```python get_ip = SimpleHttpOperator( task_id='get_ip', endpoint='get', method='GET', xcom_push=True ) @dag.task(multiple_outputs=True) def prepare_email(raw_json: str) -> Dict[str, str]: external_ip = json.loads(raw_json)['origin'] return { 'subject':f'Server connected from {external_ip}', 'body': f'Seems like today your server executing Airflow is connected from the external IP {external_ip}<br>' } email_info = prepare_email(get_ip.output) send_email = EmailOperator( task_id='send_email', to='exam...@example.com', subject=email_info['subject'], html_content=email_info['body'] ) ``` I do believe the new approach is better. I'm happy to change the AIP and submit it for vote if that seems something that may be required. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org