I like this idea a lot. We could create something similar to an “executor_config” so people can pre-populate most of the parameters necessary for a python_operator and pass it in
e.g. Config = AirflowPythonConfig(…) @airflow.make_python_operator(config) def my_func(): On Mon, Feb 3, 2020 at 10:36 AM, Dan Davydov <ddavy...@twitter.com.invalid> wrote: I like it : ). I think the difficulty in creating operators and chaining them together is one of the most common complaints about Airflow compared to other frameworks. Would be curious to see a comparison to other interfaces e.g. Dagster as well. I would be curious to see what other committers like Bolke think but I think the next stage is a lightweight AIP <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-1%3A+Improve+Airflow+Security> if you want to continue on this On Tue, Jan 28, 2020 at 3:56 PM Gerard Casas Saez <gcasass...@twitter.com.invalid> wrote: > Hi everyone! > > Starting a conversation here about extending Airflow for supporting a more > functional way to define DAGs including better data dependency/lineage > clarity on the DAG itself. I believe adding this functional extension would > allow to support more Data pipelines use cases and extend Airflow usage, > while being backwards compatible. > > My proposal is to include a __call__ function that would dynamically > replace class attributes before executing the `pre_execute` and `execute` > function. This tied with a XComArg, a class that points to a previous task > XCom pushed value, allows to define DAGs in a more functional and data > approach. Basically my proposal is: > > > • Add a __call__ function in BaseOperator that accepts Inlets (in my case > its XComArgs) > • Add a simple Python Decorator to create PythonOperators easily from > functions > • Resolves them before executing any main operator execution on a > worker/K8s pod…. > • Log their values on execution time and make them visible in the Web UI > for each task instance > • Set attribute in the operator instance > • Executes the operator and returns an XComArgs that can later be tied in > a new operator as an Inlet… > > > I wrote a small extension (~200 lines of code) that implements it. Here’s > what it would look like with existing operators (and that works with some > simple tweaks on the already existing codebase): > > get_ip = SimpleHttpOperator( > task_id='get_ip', endpoint='get', method='GET', xcom_push=True > ) > > @operation > def email_subject_generator(raw_json: str) -> str: > external_ip = json.loads(raw_json)['origin'] > return f"Server connected from {external_ip}” > > @operation > def email_body_generator(raw_json: str) -> str: > return """ > Seems like today your server executing Airflow is connected from the > external IP {origin}<br> > <br> > Have a nice day! > """.format(**json.loads(raw_json)) > > send_email = EmailOperator( > task_id='send_email', > to="exam...@example.com", > subject='', > html_content='' > ) > > ip_info = get_ip() > subject = email_subject_generator(ip_info) > body = email_body_generator(ip_info) > send_email(subject=subject, html_content=body) > > As someone wise sometime said, code is better than words, so here’s my > experimental code: https://github.com/casassg/corrent and a couple of > example DAGs I wrote: https://github.com/casassg/corrent/tree/master/dags > > This approach is highly inspired from some alternatives that have lately > appeared on ETL pipelines world (ex: Dagster) that support more functional > ways of defining DAGs. > > I believe this approach could benefit the Airflow community to grow. What > do you think? Any thoughts on functional DAGs? Would love to hear what are > pros and cons of this expansion. To me, it seems highly linked to the > Lineage discussion happening at the same time, as both discussions are > around improving data pipelines. > > Thanks! > > Gerard Casas Saez > Twitter | Cortex | @casassaez >