Here are a couple more examples of where as an Airflow user I am introducing state in to my tasks (so they technically breaks idempotency): * Authentication mechanisms that return errors based on wrong details, state is kept so retries don't cause multiple login attempts which would cause the account to be locked, in thie case manual intervention is required to re-enable the tasks * Sensors that send email/sms notifications on certain events, state is kept so they're not redunently sent on retry/reschedule, state is also kept so they can "escalate" in urgency each time they are sent out even across retries/reschedules.
I understand wanting to keep Airflow tasks as stateless as possible but from my perspective real world systems are not stateless (or without side effect) and modeling that state is sometimes the only viable solution. The motivation of this thread is that some users are saying they need state, I can tell you at least some users are already hacking state in to their existing Airflow setup and would greately appreciate if implemented it doesn't have arbitrary limits on it's use :). My 2 cents as a user anyway. Damian -----Original Message----- From: Daniel Standish <dpstand...@gmail.com> Sent: Monday, January 13, 2020 14:03 To: dev@airflow.apache.org Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators @Jarek & Fokko Firstly, thank you for the thorough and thoughtful replies -- I appreciate it. I recognize that the execution_date-based idempotence pattern is core to airflow. I understand that it motivated a lot of the design decisions, and that it is a pattern that is endorsed and explicitly encouraged in the documentation, and by the community in general. But surely it's not the position of the airflow project that data pipelines deviating from this pattern are officially forbidden and unsupported? I think there is a place in airflow for tasks that are not driven by "execution_date", as well as tasks for which "airflow backfill" does not make sense. Airflow provides great flexibility, and a lot of value and applications beyond that one pattern. I believe there are cases where it makes sense to persist some information for a task or a dag. Incremental processes is one. The reschedule poke operator is another. Breaking a heavy data transfer process into sub-batches and storing progress is another. And I'd be in favor of providing a better built-in mechanism to do it within airflow. I would not constrain it to async job ids; just arbitrary data however the user wants to use it. I am not convinced that adding the capacity to store information associated with dag+task undermines or meaningfully alters the airflow project. I would bet that pipelines not following the canonical pattern are pretty common in the wild. And when you don't for whatever reason lean on execution_date, it's nice to have a different mechanism for storing state. That's my take. Detailed responses below. @Jarek RE: the "cluster maintainer" role, and the "idempotence guarantee", and "by adding state you lose the property of idempotency" I don't think airflow provides any such guarantee as it is. - By themselves, even built-in airflow operators do not in general provide an idempotence guarantee. It really depends how they are used. - For example, a _SqlOperator has a `sql` field. Whether the task is idempotent or not depends entirely on the `sql` that is passed to the operator. - With non-built-in operators, all bets are off. - Variables and XCom can be used to persist and transmit state. - People use catchup=False to facilitate non-idempotent tasks. - As a scheduler, it can execute arbitrary processes. These facts mean that as a cluster maintainer, if there is an idempotence guarantee, it's because the team enforces it -- not because airflow guarantees it. Airflow is simply unable to enforce this by itself. @Jarek & Fokko RE: you are using the wrong tool for this job, airflow is not for streaming, you should look at apache beam, read "beyond the horizon" Incremental processes are not necessarily streaming processes. The incremental scenarios I outlined are still batch processes -- hourly, every 4 hours, daily, that kind of thing. I am just determining the intervals in a different way from an execution_date-driven process. Should I use another tool? Fundamentally, what we're talking about is the ability to store a small bit of data associated with a task+dag. When I consider this gap in the context of everything airflow provides, it's insignificant by comparison, and there are ways I can solve it: - xcom (notwithstanding edge case problems e.g. getting cleared) - variables (notwithstanding messiness) - roll your own model (notwithstanding effort and complexity on my system) - incidentally, because of airflow's plugin system, if you do roll your own, you can even expose your in-house model through an admin view in airflow! Nonetheless, I don't think we should *make* users solve it. And I don't think we need beam or spark e.g. to schedule sql scripts, merely because they are incremental instead of idempotent. @Jarek RE: adding support for state adds more work for committers Maybe any feature that is a new thing can create more work. But there is also work it may eliminate. By providing explicit support for this design pattern, users would not have to force it into XCom or Variable. We could have a docs page "howto/incremental_process". This could reduce questions on "how can i do an incremental process?". By *not* providing support, the burden is shifted to users, who will find a way, but at some cost. As you have said, we could tell users this is not supported, but why turn someone away when the platform can actually handle that pattern just fine? @Fokko RE: > > This value needs to be stored somewhere. > This data is already available, but would you want to duplicate this > in Airflow for optimization, my answer would be: no. Let me try to clarify. The context here was a dim / fact / reporting process that is downstream of several tables with varying delivery cadences. When incremental load begins, you need to record the state of the source tables. Storing min of max modified dates from source tables allows us to know what we need to process in next run. This value truly does need to be stored somewhere; there is no way to figure this out after the fact by querying the content of the target. @Fokko RE: > Also, if you have a modern data platform, fetching the watermark can > be constant. I've also mentioned this on the Slack Yes in many data processes it is possible to avoid storing a watermark. Other times, it may not be possible. Still other times, while maybe it is possible to avoid it, storing watermark is the practical and elegant solution. Querying the target is not without its own pitfalls and complexities. Maybe the target is a database where you must pay $ for each query, while the source is on-prem and no cost. Or maybe you can't be sure of the timing of the downstream process. Maybe querying downstream requires adding another hook to your operator and opening a new connection and spinning up a warehouse. If the target is files, there are complexities here too. The point is, there are costs and tradeoffs, and I think watermarking based on source can for sure be defensible. Possibility is not always determinative. =============================================================================== Please access the attached hyperlink for an important electronic communications disclaimer: http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html ===============================================================================