Here are a couple more examples of where as an Airflow user I am introducing 
state in to my tasks (so they technically breaks idempotency):
* Authentication mechanisms that return errors based on wrong details, state is 
kept so retries don't cause multiple login attempts which would cause the 
account to be locked, in thie case manual intervention is required to re-enable 
the tasks
* Sensors that send email/sms notifications on certain events, state is kept so 
they're not redunently sent on retry/reschedule, state is also kept so they can 
"escalate" in urgency each time they are sent out even across 
retries/reschedules.

I understand wanting to keep Airflow tasks as stateless as possible but from my 
perspective real world systems are not stateless (or without side effect) and 
modeling that state is sometimes the only viable solution. The motivation of 
this thread is that some users are saying they need state, I can tell you at 
least some users are already hacking state in to their existing Airflow setup 
and would greately appreciate if implemented it doesn't have arbitrary limits 
on it's use :). My 2 cents as a user anyway.

Damian

-----Original Message-----
From: Daniel Standish <dpstand...@gmail.com> 
Sent: Monday, January 13, 2020 14:03
To: dev@airflow.apache.org
Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling 
in Operators

@Jarek & Fokko
Firstly, thank you for the thorough and thoughtful replies -- I appreciate it.

I recognize that the execution_date-based idempotence pattern is core to 
airflow.  I understand that it motivated a lot of the design decisions, and 
that it is a pattern that is endorsed and explicitly encouraged in the 
documentation, and by the community in general.

But surely it's not the position of the airflow project that data pipelines 
deviating from this pattern are officially forbidden and unsupported?

I think there is a place in airflow for tasks that are not driven by 
"execution_date", as well as tasks for which "airflow backfill" does not make 
sense.  Airflow provides great flexibility, and a lot of value and applications 
beyond that one pattern.

I believe there are cases where it makes sense to persist some information for 
a task or a dag.  Incremental processes is one.  The reschedule poke operator 
is another.  Breaking a heavy data transfer process into sub-batches and 
storing progress is another.  And I'd be in favor of providing a better 
built-in mechanism to do it within airflow.  I would not constrain it to async 
job ids; just arbitrary data however the user wants to use it.

I am not convinced that adding the capacity to store information associated 
with dag+task undermines or meaningfully alters the airflow project.

I would bet that pipelines not following the canonical pattern are pretty 
common in the wild.  And when you don't for whatever reason lean on 
execution_date, it's nice to have a different mechanism for storing state.

That's my take.

Detailed responses below.

@Jarek
RE: the "cluster maintainer" role, and the "idempotence guarantee", and "by 
adding state you lose the property of idempotency"

I don't think airflow provides any such guarantee as it is.

   - By themselves, even built-in airflow operators do not in general
   provide an idempotence guarantee.  It really depends how they are used.
      - For example, a _SqlOperator has a `sql` field.  Whether the task is
      idempotent or not depends entirely on the `sql` that is passed to the
      operator.
   - With non-built-in operators, all bets are off.
   - Variables and XCom can be used to persist and transmit state.
   - People use catchup=False to facilitate non-idempotent tasks.
   - As a scheduler, it can execute arbitrary processes.

These facts mean that as a cluster maintainer, if there is an idempotence 
guarantee, it's because the team enforces it -- not because airflow guarantees 
it.  Airflow is simply unable to enforce this by itself.

@Jarek & Fokko
RE: you are using the wrong tool for this job, airflow is not for streaming, 
you should look at apache beam, read "beyond the horizon"

Incremental processes are not necessarily streaming processes.  The incremental 
scenarios I outlined are still batch processes -- hourly, every
4 hours, daily, that kind of thing.  I am just determining the intervals in a 
different way from an execution_date-driven process.
Should I use another tool?  Fundamentally, what we're talking about is the 
ability to store a small bit of data associated with a task+dag.  When I 
consider this gap in the context of everything airflow provides, it's 
insignificant by comparison, and there are ways I can solve it:

   - xcom (notwithstanding edge case problems e.g. getting cleared)
   - variables (notwithstanding messiness)
   - roll your own model (notwithstanding effort and complexity on my
   system)
      - incidentally, because of airflow's plugin system, if you do roll
      your own, you can even expose your in-house model through an admin view in
      airflow!

Nonetheless, I don't think we should *make* users solve it.  And I don't think 
we need beam or spark e.g. to schedule sql scripts, merely because they are 
incremental instead of idempotent.

@Jarek
RE: adding support for state adds more work for committers Maybe any feature 
that is a new thing can create more work.  But there is also work it may 
eliminate.  By providing explicit support for this design pattern, users would 
not have to force it into XCom or Variable.  We could have a docs page 
"howto/incremental_process".  This could reduce questions on "how can i do an 
incremental process?".  By *not* providing support, the burden is shifted to 
users, who will find a way, but at some cost.
As you have said, we could tell users this is not supported, but why turn 
someone away when the platform can actually handle that pattern just fine?

@Fokko
RE:

> >  This value needs to be stored somewhere.
> This data is already available, but would you want to duplicate this 
> in Airflow for optimization, my answer would be: no.


Let me try to clarify.  The context here was a dim / fact / reporting process 
that is downstream of several tables with varying delivery cadences.
When incremental load begins, you need to record the state of the source 
tables.  Storing min of max modified dates from source tables allows us to know 
what we need to process in next run.  This value truly does need to be stored 
somewhere; there is no way to figure this out after the fact by querying the 
content of the target.

@Fokko
RE:

> Also, if you have a modern data platform, fetching the watermark can 
> be constant. I've also mentioned this on the Slack


Yes in many data processes it is possible to avoid storing a watermark.
Other times, it may not be possible.  Still other times, while maybe it is 
possible to avoid it, storing watermark is the practical and elegant solution.
Querying the target is not without its own pitfalls and complexities.
Maybe the target is a database where you must pay $ for each query, while the 
source is on-prem and no cost.  Or maybe you can't be sure of the timing of the 
downstream process.  Maybe querying downstream requires adding another hook to 
your operator and opening a new connection and spinning up a warehouse.  If the 
target is files, there are complexities here too.  The point is, there are 
costs and tradeoffs, and I think watermarking based on source can for sure be 
defensible.  Possibility is not always determinative.



=============================================================================== 
Please access the attached hyperlink for an important electronic communications 
disclaimer: 
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
=============================================================================== 

Reply via email to