casassg commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r441013469



##########
File path: docs/concepts.rst
##########
@@ -116,6 +116,47 @@ DAGs can be used as context managers to automatically 
assign new operators to th
 
     op.dag is dag # True
 
+.. _concepts:functional_dags:
+
+Functional DAGs
+---------------
+*Added in Airflow 1.10.11*
+
+DAGs can be defined using functional abstractions. Outputs and inputs are sent 
between tasks using
+:ref:`XComs <concepts:xcom>` values. In addition, you can wrap functions as 
tasks using the
+:ref:`task decorator <concepts:task_decorator>`. Dependencies are 
automatically inferred from
+the message dependencies.
+
+Example DAG with functional abstraction
+
+.. code:: python
+
+  with DAG(
+      'send_server_ip', default_args=default_args, schedule_interval=None
+  ) as dag:
+
+    # Using default connection as it's set to httpbin.org by default
+    get_ip = SimpleHttpOperator(
+        task_id='get_ip', endpoint='get', method='GET', xcom_push=True
+    )
+
+    @dag.task(multiple_outputs=True)
+    def prepare_email(raw_json: str) -> str:
+      external_ip = json.loads(raw_json)['origin']
+      return {
+        'subject':f'Server connected from {external_ip}',
+        'body': f'Seems like today your server executing Airflow is connected 
from the external IP {external_ip}<br>'
+      }
+
+    email_info = prepare_email(get_ip.output)

Review comment:
       The `output` part was implemented already as part of a previous PR. This 
PR focuses on `@task` only. 
   
   The main difference between the proposed approach vs current approach is 
that in the current approach  we do not need to `call` normal operators. This 
should reduce complexity on creating functional DAGs. 
   
   In addition, existing operators may have templated fields that are mandatory 
(example `subject` is required in `EmailOperator` initialization). This 
basically means that we need to overwrite them on `call` which is a bit 
confusing (see example below or example on AIP)
   
   The `output` is basically a way to access XComArg for non callable 
operators. This allows a more smooth definition of functional DAGs between 
normal operators and `@task` operators.
   
   Old example:
   ```python
       get_ip = SimpleHttpOperator(
           task_id='get_ip', endpoint='get', method='GET', xcom_push=True
       )
   
       @dag.task(multiple_outputs=True)
       def prepare_email(raw_json: str) -> Dict[str, str]:
         external_ip = json.loads(raw_json)['origin']
         return {
           'subject':f'Server connected from {external_ip}',
           'body': f'Seems like today your server executing Airflow is 
connected from the external IP {external_ip}<br>'
         }
       server_info = get_ip()
       email_info = prepare_email(server_info)
         
       send_email = EmailOperator(
           task_id='send_email',
           to='exam...@example.com',
           subject='',
           html_content=''
       )
       send_email(subject=email_info['subject'], 
html_content=email_info['body'])
   ```
   
   New approach:
   ```python
   
       get_ip = SimpleHttpOperator(
           task_id='get_ip', endpoint='get', method='GET', xcom_push=True
       )
   
       @dag.task(multiple_outputs=True)
       def prepare_email(raw_json: str) -> Dict[str, str]:
         external_ip = json.loads(raw_json)['origin']
         return {
           'subject':f'Server connected from {external_ip}',
           'body': f'Seems like today your server executing Airflow is 
connected from the external IP {external_ip}<br>'
         }
   
       email_info = prepare_email(get_ip.output)
   
       send_email = EmailOperator(
           task_id='send_email',
           to='exam...@example.com',
           subject=email_info['subject'],
           html_content=email_info['body']
       )
   ```
   
   I do believe the new approach is better. I'm happy to change the AIP and 
submit it for vote if that seems something that may be required.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to