I like it : ). I think the difficulty in creating operators and chaining
them together is one of the most common complaints about Airflow compared
to other frameworks. Would be curious to see a comparison to other
interfaces e.g. Dagster as well. I would be curious to see what other
committers like Bolke think but I think the next stage is a lightweight AIP
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-1%3A+Improve+Airflow+Security>
if you want to continue on this

On Tue, Jan 28, 2020 at 3:56 PM Gerard Casas Saez
<gcasass...@twitter.com.invalid> wrote:

> Hi everyone!
>
> Starting a conversation here about extending Airflow for supporting a more
> functional way to define DAGs including better data dependency/lineage
> clarity on the DAG itself. I believe adding this functional extension would
> allow to support more Data pipelines use cases and extend Airflow usage,
> while being backwards compatible.
>
> My proposal is to include a __call__ function that would dynamically
> replace class attributes before executing the `pre_execute` and `execute`
> function. This tied with a XComArg, a class that points to a previous task
> XCom pushed value, allows to define DAGs in a more functional and data
> approach. Basically my proposal is:
>
>
> • Add a __call__ function in BaseOperator that accepts Inlets (in my case
> its XComArgs)
> • Add a simple Python Decorator to create PythonOperators easily from
> functions
> • Resolves them before executing any main operator execution on a
> worker/K8s pod….
> • Log their values on execution time and make them visible in the Web UI
> for each task instance
> • Set attribute in the operator instance
> • Executes the operator and returns an XComArgs that can later be tied in
> a new operator as an Inlet…
>
>
> I wrote a small extension (~200 lines of code) that implements it. Here’s
> what it would look like with existing operators (and that works with some
> simple tweaks on the already existing codebase):
>
> get_ip = SimpleHttpOperator(
>      task_id='get_ip', endpoint='get', method='GET', xcom_push=True
>  )
>
>  @operation
>  def email_subject_generator(raw_json: str) -> str:
>    external_ip = json.loads(raw_json)['origin']
>    return f"Server connected from {external_ip}”
>
>  @operation
>  def email_body_generator(raw_json: str) -> str:
>    return """
>    Seems like today your server executing Airflow is connected from the
> external IP {origin}<br>
>    <br>
>    Have a nice day!
>    """.format(**json.loads(raw_json))
>
>  send_email = EmailOperator(
>      task_id='send_email',
>      to="exam...@example.com",
>      subject='',
>      html_content=''
>  )
>
>  ip_info = get_ip()
>  subject = email_subject_generator(ip_info)
>  body = email_body_generator(ip_info)
>  send_email(subject=subject, html_content=body)
>
> As someone wise sometime said, code is better than words, so here’s my
> experimental code: https://github.com/casassg/corrent  and a couple of
> example DAGs I wrote: https://github.com/casassg/corrent/tree/master/dags
>
> This approach is highly inspired from some alternatives that have lately
> appeared on ETL pipelines world (ex: Dagster) that support more functional
> ways of defining DAGs.
>
> I believe this approach could benefit the Airflow community to grow. What
> do you think? Any thoughts on functional DAGs? Would love to hear what are
> pros and cons of this expansion. To me, it seems highly linked to the
> Lineage discussion happening at the same time, as both discussions are
> around improving data pipelines.
>
> Thanks!
>
> Gerard Casas Saez
> Twitter | Cortex | @casassaez
>

Reply via email to