@Jarek & Fokko
Firstly, thank you for the thorough and thoughtful replies -- I appreciate
it.
I recognize that the execution_date-based idempotence pattern is core to
airflow. I understand that it motivated a lot of the design decisions, and
that it is a pattern that is endorsed and explicitly encouraged in the
documentation, and by the community in general.
But surely it's not the position of the airflow project that data pipelines
deviating from this pattern are officially forbidden and unsupported?
I think there is a place in airflow for tasks that are not driven by
"execution_date", as well as tasks for which "airflow backfill" does not
make sense. Airflow provides great flexibility, and a lot of value and
applications beyond that one pattern.
I believe there are cases where it makes sense to persist some information
for a task or a dag. Incremental processes is one. The reschedule poke
operator is another. Breaking a heavy data transfer process into
sub-batches and storing progress is another. And I'd be in favor of
providing a better built-in mechanism to do it within airflow. I would not
constrain it to async job ids; just arbitrary data however the user wants
to use it.
I am not convinced that adding the capacity to store information associated
with dag+task undermines or meaningfully alters the airflow project.
I would bet that pipelines not following the canonical pattern are pretty
common in the wild. And when you don't for whatever reason lean on
execution_date, it's nice to have a different mechanism for storing state.
That's my take.
Detailed responses below.
@Jarek
RE: the "cluster maintainer" role, and the "idempotence guarantee", and "by
adding state you lose the property of idempotency"
I don't think airflow provides any such guarantee as it is.
- By themselves, even built-in airflow operators do not in general
provide an idempotence guarantee. It really depends how they are used.
- For example, a _SqlOperator has a `sql` field. Whether the task is
idempotent or not depends entirely on the `sql` that is passed to the
operator.
- With non-built-in operators, all bets are off.
- Variables and XCom can be used to persist and transmit state.
- People use catchup=False to facilitate non-idempotent tasks.
- As a scheduler, it can execute arbitrary processes.
These facts mean that as a cluster maintainer, if there is an idempotence
guarantee, it's because the team enforces it -- not because airflow
guarantees it. Airflow is simply unable to enforce this by itself.
@Jarek & Fokko
RE: you are using the wrong tool for this job, airflow is not for
streaming, you should look at apache beam, read "beyond the horizon"
Incremental processes are not necessarily streaming processes. The
incremental scenarios I outlined are still batch processes -- hourly, every
4 hours, daily, that kind of thing. I am just determining the intervals in
a different way from an execution_date-driven process.
Should I use another tool? Fundamentally, what we're talking about is the
ability to store a small bit of data associated with a task+dag. When
I consider this gap in the context of everything airflow provides, it's
insignificant by comparison, and there are ways I can solve it:
- xcom (notwithstanding edge case problems e.g. getting cleared)
- variables (notwithstanding messiness)
- roll your own model (notwithstanding effort and complexity on my
system)
- incidentally, because of airflow's plugin system, if you do roll
your own, you can even expose your in-house model through an
admin view in
airflow!
Nonetheless, I don't think we should *make* users solve it. And I don't
think we need beam or spark e.g. to schedule sql scripts, merely because
they are incremental instead of idempotent.
@Jarek
RE: adding support for state adds more work for committers
Maybe any feature that is a new thing can create more work. But there is
also work it may eliminate. By providing explicit support for this design
pattern, users would not have to force it into XCom or Variable. We could
have a docs page "howto/incremental_process". This could reduce questions
on "how can i do an incremental process?". By *not* providing support, the
burden is shifted to users, who will find a way, but at some cost.
As you have said, we could tell users this is not supported, but why turn
someone away when the platform can actually handle that pattern just fine?
@Fokko
RE:
> > This value needs to be stored somewhere.
> This data is already available, but would you want to duplicate this in
> Airflow for optimization, my answer would be: no.
Let me try to clarify. The context here was a dim / fact / reporting
process that is downstream of several tables with varying delivery
cadences.
When incremental load begins, you need to record the state of the source
tables. Storing min of max modified dates from source tables allows us to
know what we need to process in next run. This value truly does need to be
stored somewhere; there is no way to figure this out after the fact by
querying the content of the target.
@Fokko
RE:
> Also, if you have a modern data platform, fetching the watermark can be
> constant. I've also mentioned this on the Slack
Yes in many data processes it is possible to avoid storing a watermark.
Other times, it may not be possible. Still other times, while maybe it is
possible to avoid it, storing watermark is the practical and elegant
solution.
Querying the target is not without its own pitfalls and complexities.
Maybe the target is a database where you must pay $ for each query, while
the source is on-prem and no cost. Or maybe you can't be sure of the
timing of the downstream process. Maybe querying downstream requires
adding another hook to your operator and opening a new connection and
spinning up a warehouse. If the target is files, there are complexities
here too. The point is, there are costs and tradeoffs, and I think
watermarking based on source can for sure be defensible. Possibility is
not always determinative.