I think some of the discussions about incremental and/or idempotency are confusing the topic and are a distraction from the real question. As I said in my previous reply on this thread, many tasks utilize state that is kept somewhere in order to achieve idempotency in an efficient way. Whether that be data in a target database table, or the files that exist in a target filesystem. So the question we should be concerned with is not "are there patterns that fit the Airflow model that require state to be kept somewhere?" but is simply "Should that state be kept in Airflow?"
I think my answer is no for a couple of reasons: 1. There is a possibility that the state kept in Airflow diverges from the reality of the target system it is supposed to represent. As Fokko said "This data is already available" and storing it in Airflow is just "for optimization". 2. I think it adds unnecessary complexity to Airflow, and risks feature creep. We would be better served keeping a tight focus on what we do well. However, I have a lot of sympathy for the arguments that Daniel and Damien are making because the same arguments could be said for the idea of keeping state across reschedules. The typical use case for that feature is to store something like a BigQuery job id, which you poll against periodically. Rather than store that id in Airflow, you could also add labels to the BigQuery job when created and then at poll time, list the recent BigQuery jobs and find the one with the correct labels. In the case where the system where your long running job is executing doesn't have any ability to label/tag a job or search for jobs, then you could store the relevant id in a file in S3 (or similar) in a location that is templated based on dag, task and execution_date. In that sense, Airflow keeping state across reschedules, or retries and even the existing XCom is just an optimization for data that generally exists or could be persisted elsewhere. At its core I think Airflow should care about scheduling, dependencies and what tasks ran and when, but shouldn't care about the outcomes of those tasks beyond success or failure. The existing XCom and any expansion of that adds a messaging component to Airflow that I think is not its core functionality. I'm not advocating removing XCom, but I say no to expanding upon it. However, IF we are going to expand on XCom and add more messaging between tasks, task instances and/or different runs of task instances then: 1. Why limit it a few possible scenarios? 2. Is a database really the best way of implementing that messaging functionality? Chris On Mon, Jan 13, 2020 at 7:03 PM Daniel Imberman <daniel.imber...@gmail.com> wrote: > @jarek reading through the history of this and will try to come back with > thoughts. > > via Newton Mail > [ > https://cloudmagic.com/k/d/mailapp?ct=dx&cv=10.0.32&pv=10.14.5&source=email_footer_2 > ] > On Mon, Jan 13, 2020 at 3:47 PM, Jarek Potiuk <jarek.pot...@polidea.com> > wrote: > I think the main subject has been hijacked (sorry Jacob) - true Pandora's > box. I will just change the subject to something more appropriate and I > proposed we leave the original thread to discuss rescheduling. > > I wonder what others think about it (commiters/PMCs)? I think this > subject has been discussed already and maybe we could hear other opinions ? > > J. > > > On Mon, Jan 13, 2020 at 10:47 PM Shaw, Damian P. < > damian.sha...@credit-suisse.com> wrote: > > > Here are a couple more examples of where as an Airflow user I am > > introducing state in to my tasks (so they technically breaks > idempotency): > > * Authentication mechanisms that return errors based on wrong details, > > state is kept so retries don't cause multiple login attempts which would > > cause the account to be locked, in thie case manual intervention is > > required to re-enable the tasks > > * Sensors that send email/sms notifications on certain events, state is > > kept so they're not redunently sent on retry/reschedule, state is also > kept > > so they can "escalate" in urgency each time they are sent out even > across > > retries/reschedules. > > > > I understand wanting to keep Airflow tasks as stateless as possible but > > from my perspective real world systems are not stateless (or without > side > > effect) and modeling that state is sometimes the only viable solution. > The > > motivation of this thread is that some users are saying they need > state, > I > > can tell you at least some users are already hacking state in to their > > existing Airflow setup and would greately appreciate if implemented it > > doesn't have arbitrary limits on it's use :). My 2 cents as a user > anyway. > > > > Damian > > > > -----Original Message----- > > From: Daniel Standish <dpstand...@gmail.com> > > Sent: Monday, January 13, 2020 14:03 > > To: dev@airflow.apache.org > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke > > Rescheduling in Operators > > > > @Jarek & Fokko > > Firstly, thank you for the thorough and thoughtful replies -- I > appreciate > > it. > > > > I recognize that the execution_date-based idempotence pattern is core to > > airflow. I understand that it motivated a lot of the design decisions, > and > > that it is a pattern that is endorsed and explicitly encouraged in the > > documentation, and by the community in general. > > > > But surely it's not the position of the airflow project that data > > pipelines deviating from this pattern are officially forbidden and > > unsupported? > > > > I think there is a place in airflow for tasks that are not driven by > > "execution_date", as well as tasks for which "airflow backfill" does not > > make sense. Airflow provides great flexibility, and a lot of value and > > applications beyond that one pattern. > > > > I believe there are cases where it makes sense to persist some > information > > for a task or a dag. Incremental processes is one. The reschedule poke > > operator is another. Breaking a heavy data transfer process into > > sub-batches and storing progress is another. And I'd be in favor of > > providing a better built-in mechanism to do it within airflow. I would > not > > constrain it to async job ids; just arbitrary data however the user > wants > > to use it. > > > > I am not convinced that adding the capacity to store information > > associated with dag+task undermines or meaningfully alters the airflow > > project. > > > > I would bet that pipelines not following the canonical pattern are > pretty > > common in the wild. And when you don't for whatever reason lean on > > execution_date, it's nice to have a different mechanism for storing > state. > > > > That's my take. > > > > Detailed responses below. > > > > @Jarek > > RE: the "cluster maintainer" role, and the "idempotence guarantee", and > > "by adding state you lose the property of idempotency" > > > > I don't think airflow provides any such guarantee as it is. > > > > - By themselves, even built-in airflow operators do not in general > > provide an idempotence guarantee. It really depends how they are used. > > - For example, a _SqlOperator has a `sql` field. Whether the task is > > idempotent or not depends entirely on the `sql` that is passed to the > > operator. > > - With non-built-in operators, all bets are off. > > - Variables and XCom can be used to persist and transmit state. > > - People use catchup=False to facilitate non-idempotent tasks. > > - As a scheduler, it can execute arbitrary processes. > > > > These facts mean that as a cluster maintainer, if there is an > idempotence > > guarantee, it's because the team enforces it -- not because airflow > > guarantees it. Airflow is simply unable to enforce this by itself. > > > > @Jarek & Fokko > > RE: you are using the wrong tool for this job, airflow is not for > > streaming, you should look at apache beam, read "beyond the horizon" > > > > Incremental processes are not necessarily streaming processes. The > > incremental scenarios I outlined are still batch processes -- hourly, > every > > 4 hours, daily, that kind of thing. I am just determining the intervals > > in a different way from an execution_date-driven process. > > Should I use another tool? Fundamentally, what we're talking about is > the > > ability to store a small bit of data associated with a task+dag. When I > > consider this gap in the context of everything airflow provides, it's > > insignificant by comparison, and there are ways I can solve it: > > > > - xcom (notwithstanding edge case problems e.g. getting cleared) > > - variables (notwithstanding messiness) > > - roll your own model (notwithstanding effort and complexity on my > > system) > > - incidentally, because of airflow's plugin system, if you do roll > > your own, you can even expose your in-house model through an admin > > view in > > airflow! > > > > Nonetheless, I don't think we should *make* users solve it. And I don't > > think we need beam or spark e.g. to schedule sql scripts, merely because > > they are incremental instead of idempotent. > > > > @Jarek > > RE: adding support for state adds more work for committers Maybe any > > feature that is a new thing can create more work. But there is also work > > it may eliminate. By providing explicit support for this design pattern, > > users would not have to force it into XCom or Variable. We could have a > > docs page "howto/incremental_process". This could reduce questions on > "how > > can i do an incremental process?". By *not* providing support, the > burden > > is shifted to users, who will find a way, but at some cost. > > As you have said, we could tell users this is not supported, but why > turn > > someone away when the platform can actually handle that pattern just > fine? > > > > @Fokko > > RE: > > > > > > This value needs to be stored somewhere. > > > This data is already available, but would you want to duplicate this > > > in Airflow for optimization, my answer would be: no. > > > > > > Let me try to clarify. The context here was a dim / fact / reporting > > process that is downstream of several tables with varying delivery > cadences. > > When incremental load begins, you need to record the state of the source > > tables. Storing min of max modified dates from source tables allows us > to > > know what we need to process in next run. This value truly does need to > be > > stored somewhere; there is no way to figure this out after the fact by > > querying the content of the target. > > > > @Fokko > > RE: > > > > > Also, if you have a modern data platform, fetching the watermark can > > > be constant. I've also mentioned this on the Slack > > > > > > Yes in many data processes it is possible to avoid storing a watermark. > > Other times, it may not be possible. Still other times, while maybe it > is > > possible to avoid it, storing watermark is the practical and elegant > > solution. > > Querying the target is not without its own pitfalls and complexities. > > Maybe the target is a database where you must pay $ for each query, > while > > the source is on-prem and no cost. Or maybe you can't be sure of the > > timing of the downstream process. Maybe querying downstream requires > > adding another hook to your operator and opening a new connection and > > spinning up a warehouse. If the target is files, there are complexities > > here too. The point is, there are costs and tradeoffs, and I think > > watermarking based on source can for sure be defensible. Possibility is > > not always determinative. > > > > > > > > > > =============================================================================== > > > > Please access the attached hyperlink for an important electronic > > communications disclaimer: > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html > > > > =============================================================================== > > > > > > > -- > > Jarek Potiuk > Polidea <https://www.polidea.com/> | Principal Software Engineer > > M: +48 660 796 129 <+48660796129> > [image: Polidea] <https://www.polidea.com/>