Airflow already provides a mechanism for state persistence: the Variable, and, with caveats and flaws, XCom.
I personally persist state to the airflow metastore database for a large percentage of our jobs. They are incremental jobs and it is helpful to keep track of watermark. I think that incremental jobs are probably very very common in airflow implementations. Though probably often times users resort to imperfect vehicles for this such as `execution_date` or xcom. I have a very draftey draft aip that i haven't had enough time to work on, which explores adding explicit support for state persistence to airflow: https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence. Though I understand it is a controversial idea. (note: The AIP is not ready for primetime.) I am of the mind that being able to persist some state is not a fundamental change to airflow and would just add explicit (and more user-friendly) support for something that is already quite common, and fits fully within the wheelhouse of what airflow exists to do. On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <[email protected]> wrote: > Furcy, > > To clarify, when I say that Airflow should not be in the business of > keeping state about external systems, I specifically mean it shouldn't be > keeping state to be shared between task instances. I completely understand > that there may be external systems that are harder to work with, and like > in your case require the operator to be able to store some piece of > information to make them idempotent. I just don't think that Airflow should > provide that storage mechanism. > > I would think that most users of Airflow have access to some sort of cloud > storage like S3 (which are really just key-value stores), and it's easy > enough to write your job_id or whatever value you care about to a file with > a prefix computed from the dag_id, task_id, execution_date or whatever > combination of them you care about. Yes it makes your operators more > complex and they have to know about another system, but it keeps that > complexity out of core Airflow. That's the trade off. > > Ash, > I'm not suggesting that XCom be removed from Airflow, and I understand > there are use cases where it makes some things convenient. In your example > though, it would be just as easy for the sensor to write the found object > path as the contents of another file in S3, with a computable prefix based > on the dag/task/execution_date. > > > At its heart XCom is just a key-value store where the keys are limited to > a very specific set of possibilities, and where key-value pairs are managed > in some specific ways. The request here is to add another narrowly defined > set of allowable keys, and as far as I can tell with no extra management of > them. The only real advantage of using the Airflow database for XCom or any > expansion/variation on it is that we know that all operators have access to > the database. > > I'm not an expert but I would wonder how well Postgres or MySQL perform as > high volume key value stores. Does anyone actually use XCom at scale, and > does that extra load on the database impact scheduling and other > performance aspects of Airflow? > > Chris > > > On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <[email protected]> wrote: > >> Just to touch on one point about XCom, and to re-assure people that they, >> or something very like them are in Airflow for the foreseeable future. >> >> As an example of an appropriate use for XCom: Let's say a third party >> delivers you a set of files once a week, but the exact name of the files >> isn't known (by you) in advance. So you write a sensor that polls/checks S3 >> for the Objects to appear in our bucket, and the sensor outputs the S3 >> Object path to XCom, that then next processing step then examines to >> process the files. >> >> That sort of use case is not going anywhere. >> >> Cheers, >> -ash >> >> On Jun 1 2020, at 7:37 pm, Chris Palmer <[email protected]> wrote: >> >> At the risk of repeating myself (from the previous thread that touched on >> this topic), I don't think Airflow should be in the business of keeping >> state about external systems. Airflow is about authoring and running >> workflows; it's not a messaging tool or a cluster management tool. I'm not >> convinced that the existing XCom functionality should really be a part of >> Airflow, and I certainly don't think it should be expanded upon or new >> variations added. I think storing state is especially risky, if for no >> other reason than the fact that Airflow is not the source of truth about >> those systems. It's very likely that at some times the "state" that Airflow >> has saved will diverge from the actual state of the external system. >> Handling that nicely, probably requires a bunch of custom code in the >> operators/hooks anyway, so I don't think it saves anything in terms of >> operator code complexity. Users would be much better served going to the >> source of truth to determine state. If part of the problem is that Livy is >> lacking in features (like being able to query the status of a particular >> job_id) then I think it would be more appropriate to add the needed >> features to that project. Airflow at its core shouldn't be concerned with >> making up for failures of other tools. >> >> Also as can be seen by just this discussion, it's hard to keep these >> extra features from expanding in scope. Jarek proposed something that would >> just store a single string, and immediately Furcy wants to expand it to >> store multiple strings. Either way we are really just talking about a >> key-value store, and putting limits on how that key can be structured; the >> key is made up of some predefined set of Airflow entities (for Jarek's >> proposal) or some arbitrary key along with those Airflow entities (Furcy's >> proposal). >> >> I know in the past that I had a situation where I wanted to reuse a >> cluster across multiple data intervals, if one was already running (this >> was before I discovered Airflow so wasn't "execution dates" precisely). I >> can equally see use cases where I might want to share some resource for >> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we >> added this then why limit it to any one of those combinations? But then we >> just have an arbitrary key-value store. If you want to use Airflow for that >> then you can use Variables, if you want to use something else then you can. >> >> Unless Airflow is doing some extra management of these key-values in some >> way (like it does with clearing out XCom's on reruns), then I see >> absolutely no added benefit. And even with some potential management by >> Airflow I'm still not convinced that Airflow is the right place for it. >> >> Chris >> >> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <[email protected]> wrote: >> >> Thank you Jarek for the detailed explanation, >> >> That's exactly what I wanted to do: write a feature request to summarize >> all those discussions. >> I agree with you that the feature should be marked distinct from the XCom >> feature and that we should not piggyback this feature into XCom. >> >> The crux of the problem, I think is that with XCom you do want the task >> to delete it's xcom on the beginning of the retry. >> Correct me if I'm wrong but one use cases where it was necessary was >> having a task A and a task B that starts immediately after A, and wait from >> some 'signal' from A. >> If A and B restart and A doesn't reset it's signal, then B will use the >> signal from A's first try, which is incorrect. >> >> About the 3 solutions you mention: >> >> 1) Providing the job_id from outside. That works indeed. Sadly in my >> use-case Livy's API is poorly designed and only returns a generated job_id, >> you can't specify a custom one. >> You can't even find a job by name, I would have to list all the active >> job_ids, and do a GET for each of them to get it's name and find which one >> is the one I want. It's doable but inelegant. >> >> 2) Store the id in an external storage. Of course it would work but it >> requires an external storage. More on that below. >> >> 3) I'm not sure I understand completely what you mean there, but I think >> you mean that the idempotency can be handled by the service you call (for >> instance BigQuery). Indeed that is another solution. If we were using Spark >> with a Hive metastore + locking or the deltalake storage format, we could >> have something to prevent a job that run twice from creating duplicates. >> This is another solution we are considering, but it is coslty to change now. >> >> You guess correctly that the feature I was asking for me would be to >> provide some utility to let the users implement solution 2) without >> requiring an external storage. >> I think it would be a QOL improvement for some use cases, just like it >> could be argued that XCom is just a QOL improvement and users could have >> used an external storage themselves. >> The main advantage that it brings is making the custom operators much >> easier to share and reuse across the Apache Airflow community, compared to >> having to set up some external >> storage. >> >> I have seen that some users used the metadata store itself as an external >> storage by adding a new table to the airflow model: >> >> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3ccaerdx9ektwu5urq+pnq_8q-hb-nhtfnq_xwkggpxvo4mhb_...@mail.gmail.com%3e >> >> And others suggested using XCom itself as an external storage by storing >> information with a special task_id: >> https://stackoverflow.com/a/57515143/2087478 >> >> In the discussion thread you provided it was also suggested to use >> Variables to store some persisting information. >> >> These 3 approaches work but feel quite "hacky" and I believe that >> providing such functionality would be good. >> >> Finally, I don't see the point of limiting the functionality to such >> extent, providing a "IdempotencyIdStorage" that only allows you to store a >> string >> will just force people who need to store more than one id for one task >> (for whatever reason) to use some hack again, like storing a json inside >> the storage. >> >> I was more thinking about something quite similar to XCom (I liked the >> XState name suggestion), where the entry would be keyed by "(dag_id, >> task_id, execution_date, key)" >> where "key" can be whatever you want and would be kept across retries. >> >> I have read (quickly) through the "Pandora's Box" thread you linked. >> Indeed it looks like there would be many ways to misuse such feature. >> I do understand the important of idempotency, and it looks like my use >> case is one of the first ever listed where I do need to persist a state >> across retries to make my operator really idempotent. >> >> I'm surprised no one came up with it given how frequent the Spark + >> Airflow combination is (well, the BigQueryOperator was one too but found >> another solution). >> >> Of course we can blame it on Livy for being poorly conceived (unlike >> BigQuery) or we can blame it on Spark for not having a built-in security >> mechanism to prevent double-writes, >> but I think that as the above hacks show, you can't really prevent users >> from shooting themselves in the foot if that's what they really want to. >> >> While I do think that making things foolproof is important, I believe >> it's also in Python's philosophy to *not* make things foolproof at the >> detriment of simplicity for the right use cases. >> But I do understand that the use cases are different and contradictory: >> some would require the state to be persisted across reschedule and not >> retries, mine would require the state to be persisted across retries and >> not reschedule. >> >> Maybe the Airflow-y way for that would be to have one task that does the >> submit and an xcom with the job, then one task that check the progress of >> the job, but that feels very cumbersome to double the number of tasks just >> for that. Plus I'm not sure we could make the first task retry if the >> second task fails... >> >> Thanks again, >> >> Furcy >> >> >> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <[email protected]> >> wrote: >> >> I think we've discussed several approaches like that and using Xcom >> name (which for many people would mean "let's just extend XCom table for >> that" is not a very good idea to use it IMHO. I think this is very >> different functionality/logic which we might or might not agree to >> implement as a community. Naming it "Xcom" to trying to extend the XCom >> table behavior might be problematic. >> >> Not sure if you are aware but we had very similar discussion about it >> recently (without clear conclusions but at least you can see what kind of >> issues/problems different people have with this approach) >> >> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E >> >> I am not saying it is impossible to do, but I think it's a matter of how >> we formulate the "use case". It's very tempting to implement a generic - >> intra-task communication mechanism, indeed. But it can very easily lead to >> people abusing it and bypassing the guarantees (idempotency mainly) that >> Airflow provides for backfilling and re-running tasks. I thought a bit >> after the latest discussion kind of died out, and I have one possible >> solution to the problem. >> >> Let me explain what I think about it (but others can have different >> opinions of course): >> >> So far the discussion was that there are several ways to achieve what you >> want (and it's really about what entity is providing the "idempotency" >> guarantee: >> >> 1) Similarly as just merged in the BigQuery Insert Job >> https://github.com/apache/airflow/pull/8868/files - you can provide >> job_id from outside. You'd need to work out the job_id naming that works in >> your case and make sure that when you re-run your task with the same >> (dag_id, task_id, execution date) you will get the same id. Then the >> "uniqueness" thus idempotency is handled by the logic written in the DAG. >> >> 2) Store the DAG id in some external storage (via one of the hooks - >> where it can be queried in the way that will work for you). Then the >> idempotency is actually handled by the logic in your Operator + some >> external storage. >> >> 3) Query your service and retrieve the JOB ID from it - but you have to >> have a way to query for the job related to your "dag id + task >> + execution_date". Then - the idempotency is actually handling by the >> Service you are using. >> >> In the use case, you describe - this is the only thing you need - >> "idempotency source". I believe you would like to get the case 2) from >> above but without having to use external storage to store the "unique id". >> Something that will let each task in the same dag run to set or retrieve a >> unique value for that particular task. One value should be enough - >> assuming that each operator/task works on one external data "source". >> >> My current thinking is: >> >> Why don't we provide such a dedicated, idempotency service inside >> Airflow? We already have a DB and we could have an"IdempotencyIdStorage" >> class with two methods: >> >> * .set(id: str) and >> * .get() -> str >> >> And the data stored there should be a string keyed by "dag_id, task_id, >> execution_date)" - available also via Jinja templating. There is no >> intra-task communication, here, very little possibility of abuse and it >> seems to solve the major pain point where you have to provide your own >> storage to get the idempotency if your service does not provide one or you >> do not want to delegate it to the DAG writer. >> >> J. >> >> >> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <[email protected]> wrote: >> >> The use case I'm referring to is that you can't use xcom to let a task >> read information from it's past attempts, because when a task starts it's >> xcom is automatically deleted. >> >> My specific use case is that we have a custom LivyOperator that calls >> Livy to start batch Spark Jobs. >> When you start a batch job Livy returns a job_id >> Sometimes our operator can fail for one reason or another (for instance >> if Livy is unreachable for a while) >> When the task retries, it calls Livy again, which start the same spark >> job, but the problem is that the spark job from the first attempt can still >> be running, >> and then we have a batch job that runs twice simultaneously and creates >> duplicates in the output. >> >> What we tried to do is getting the job_id from the first try, to check if >> the job is still running, and wait for it to complete if it is. >> >> We tried using xcom to let the task send a message to itself (to it's >> next try) but xcom is meant for "inter-task communication" only so this >> doesn't work and is not intended to work. >> >> >> >> >> >> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <[email protected]> wrote: >> >> Hi Furcy, >> >> Can you give a concrete example of what you mean by intra-task xcom? >> Depending your use case this may already be possible. >> >> On Jun 1 2020, at 11:45 am, Furcy Pin <[email protected]> wrote: >> >> Hello, >> >> I would like to open a feature request for Airflow to support "intra-task >> xcom". >> >> It seems that there are several distinct use cases for it already >> and only ugly workarounds and I wanted to list them in a JIRA ticket. >> >> I wanted to summarize links to the use cases and past attempts, >> and the recommended approach (which apparently would be to create >> a distinct feature from xcom to support this, it could be calle intra-com >> or self-com ?) >> >> Do you know if such ticket already exists? I couldn't find one. >> Also I can't create any ticket due to some obscure bug (see my other >> email). >> >> Thanks, >> >> Furcy >> >> >> >> -- >> >> Jarek Potiuk >> Polidea <https://www.polidea.com/> | Principal Software Engineer >> >> M: +48 660 796 129 <+48660796129> >> [image: Polidea] <https://www.polidea.com/> >> >>
