+1 one for this idea. Something similar popped in my mind when I saw
Kubeflow approach some time ago.

T.


On Mon, Feb 3, 2020 at 8:05 PM Daniel Imberman
<[email protected]> wrote:
>
> I like this idea a lot.
>
> We could create something similar to an “executor_config” so people can 
> pre-populate most of the parameters necessary for a python_operator and pass 
> it in
>
> e.g.
>
> Config = AirflowPythonConfig(…)
>
>
> @airflow.make_python_operator(config) def my_func():
>
>
> On Mon, Feb 3, 2020 at 10:36 AM, Dan Davydov <[email protected]> 
> wrote:
> I like it : ). I think the difficulty in creating operators and chaining
> them together is one of the most common complaints about Airflow compared
> to other frameworks. Would be curious to see a comparison to other
> interfaces e.g. Dagster as well. I would be curious to see what other
> committers like Bolke think but I think the next stage is a lightweight AIP
> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-1%3A+Improve+Airflow+Security>
> if you want to continue on this
>
> On Tue, Jan 28, 2020 at 3:56 PM Gerard Casas Saez
> <[email protected]> wrote:
>
> > Hi everyone!
> >
> > Starting a conversation here about extending Airflow for supporting a more
> > functional way to define DAGs including better data dependency/lineage
> > clarity on the DAG itself. I believe adding this functional extension would
> > allow to support more Data pipelines use cases and extend Airflow usage,
> > while being backwards compatible.
> >
> > My proposal is to include a __call__ function that would dynamically
> > replace class attributes before executing the `pre_execute` and `execute`
> > function. This tied with a XComArg, a class that points to a previous task
> > XCom pushed value, allows to define DAGs in a more functional and data
> > approach. Basically my proposal is:
> >
> >
> > • Add a __call__ function in BaseOperator that accepts Inlets (in my case
> > its XComArgs)
> > • Add a simple Python Decorator to create PythonOperators easily from
> > functions
> > • Resolves them before executing any main operator execution on a
> > worker/K8s pod….
> > • Log their values on execution time and make them visible in the Web UI
> > for each task instance
> > • Set attribute in the operator instance
> > • Executes the operator and returns an XComArgs that can later be tied in
> > a new operator as an Inlet…
> >
> >
> > I wrote a small extension (~200 lines of code) that implements it. Here’s
> > what it would look like with existing operators (and that works with some
> > simple tweaks on the already existing codebase):
> >
> > get_ip = SimpleHttpOperator(
> > task_id='get_ip', endpoint='get', method='GET', xcom_push=True
> > )
> >
> > @operation
> > def email_subject_generator(raw_json: str) -> str:
> > external_ip = json.loads(raw_json)['origin']
> > return f"Server connected from {external_ip}”
> >
> > @operation
> > def email_body_generator(raw_json: str) -> str:
> > return """
> > Seems like today your server executing Airflow is connected from the
> > external IP {origin}<br>
> > <br>
> > Have a nice day!
> > """.format(**json.loads(raw_json))
> >
> > send_email = EmailOperator(
> > task_id='send_email',
> > to="[email protected]",
> > subject='',
> > html_content=''
> > )
> >
> > ip_info = get_ip()
> > subject = email_subject_generator(ip_info)
> > body = email_body_generator(ip_info)
> > send_email(subject=subject, html_content=body)
> >
> > As someone wise sometime said, code is better than words, so here’s my
> > experimental code: https://github.com/casassg/corrent and a couple of
> > example DAGs I wrote: https://github.com/casassg/corrent/tree/master/dags
> >
> > This approach is highly inspired from some alternatives that have lately
> > appeared on ETL pipelines world (ex: Dagster) that support more functional
> > ways of defining DAGs.
> >
> > I believe this approach could benefit the Airflow community to grow. What
> > do you think? Any thoughts on functional DAGs? Would love to hear what are
> > pros and cons of this expansion. To me, it seems highly linked to the
> > Lineage discussion happening at the same time, as both discussions are
> > around improving data pipelines.
> >
> > Thanks!
> >
> > Gerard Casas Saez
> > Twitter | Cortex | @casassaez
> >



-- 

Tomasz Urbaszek
Polidea | Software Engineer

M: +48 505 628 493
E: [email protected]

Unique Tech
Check out our projects!

Reply via email to