+1 one for this idea. Something similar popped in my mind when I saw Kubeflow approach some time ago.
T. On Mon, Feb 3, 2020 at 8:05 PM Daniel Imberman <[email protected]> wrote: > > I like this idea a lot. > > We could create something similar to an “executor_config” so people can > pre-populate most of the parameters necessary for a python_operator and pass > it in > > e.g. > > Config = AirflowPythonConfig(…) > > > @airflow.make_python_operator(config) def my_func(): > > > On Mon, Feb 3, 2020 at 10:36 AM, Dan Davydov <[email protected]> > wrote: > I like it : ). I think the difficulty in creating operators and chaining > them together is one of the most common complaints about Airflow compared > to other frameworks. Would be curious to see a comparison to other > interfaces e.g. Dagster as well. I would be curious to see what other > committers like Bolke think but I think the next stage is a lightweight AIP > <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-1%3A+Improve+Airflow+Security> > if you want to continue on this > > On Tue, Jan 28, 2020 at 3:56 PM Gerard Casas Saez > <[email protected]> wrote: > > > Hi everyone! > > > > Starting a conversation here about extending Airflow for supporting a more > > functional way to define DAGs including better data dependency/lineage > > clarity on the DAG itself. I believe adding this functional extension would > > allow to support more Data pipelines use cases and extend Airflow usage, > > while being backwards compatible. > > > > My proposal is to include a __call__ function that would dynamically > > replace class attributes before executing the `pre_execute` and `execute` > > function. This tied with a XComArg, a class that points to a previous task > > XCom pushed value, allows to define DAGs in a more functional and data > > approach. Basically my proposal is: > > > > > > • Add a __call__ function in BaseOperator that accepts Inlets (in my case > > its XComArgs) > > • Add a simple Python Decorator to create PythonOperators easily from > > functions > > • Resolves them before executing any main operator execution on a > > worker/K8s pod…. > > • Log their values on execution time and make them visible in the Web UI > > for each task instance > > • Set attribute in the operator instance > > • Executes the operator and returns an XComArgs that can later be tied in > > a new operator as an Inlet… > > > > > > I wrote a small extension (~200 lines of code) that implements it. Here’s > > what it would look like with existing operators (and that works with some > > simple tweaks on the already existing codebase): > > > > get_ip = SimpleHttpOperator( > > task_id='get_ip', endpoint='get', method='GET', xcom_push=True > > ) > > > > @operation > > def email_subject_generator(raw_json: str) -> str: > > external_ip = json.loads(raw_json)['origin'] > > return f"Server connected from {external_ip}” > > > > @operation > > def email_body_generator(raw_json: str) -> str: > > return """ > > Seems like today your server executing Airflow is connected from the > > external IP {origin}<br> > > <br> > > Have a nice day! > > """.format(**json.loads(raw_json)) > > > > send_email = EmailOperator( > > task_id='send_email', > > to="[email protected]", > > subject='', > > html_content='' > > ) > > > > ip_info = get_ip() > > subject = email_subject_generator(ip_info) > > body = email_body_generator(ip_info) > > send_email(subject=subject, html_content=body) > > > > As someone wise sometime said, code is better than words, so here’s my > > experimental code: https://github.com/casassg/corrent and a couple of > > example DAGs I wrote: https://github.com/casassg/corrent/tree/master/dags > > > > This approach is highly inspired from some alternatives that have lately > > appeared on ETL pipelines world (ex: Dagster) that support more functional > > ways of defining DAGs. > > > > I believe this approach could benefit the Airflow community to grow. What > > do you think? Any thoughts on functional DAGs? Would love to hear what are > > pros and cons of this expansion. To me, it seems highly linked to the > > Lineage discussion happening at the same time, as both discussions are > > around improving data pipelines. > > > > Thanks! > > > > Gerard Casas Saez > > Twitter | Cortex | @casassaez > > -- Tomasz Urbaszek Polidea | Software Engineer M: +48 505 628 493 E: [email protected] Unique Tech Check out our projects!
