I think no-one wants to remove xcom really :) On Tue, Jun 2, 2020 at 10:32 AM Michael Lutz <[email protected]> wrote:
> please do not remove existing xcom functionality. I am using it > extensively. If you implement something more robust or elegant, that > would be fine. I feel that a more robust state management system would be > helpful it feels like an area of improvement. > > On Tue, Jun 2, 2020, 3:05 AM Jarek Potiuk <[email protected]> > wrote: > >> I think this subject came so often, that I also change my mind slowly in >> favor of making an explicit state persistence "service". >> >> Whether it's only one key or more, it's secondary, I think but if users >> are already using Variables to keep state for tasks - this is a clear sign >> that we miss a crucial feature and our users are abusing Airflow already in >> the way we try to prevent by not introducing "State service". >> >> With the recent SecretBackend implementation where Variables might be >> kept in a Secret backend - not only MetaStore - potentially you might have >> no write access to the backend. There is even no "write" support in the >> current "MetastoreBackend" implementation for writing variables. So we >> already have configurations where if we try to write variables and read it >> elsewhere might not work - as far as I can see. You can set several >> backends of course and the Metastore as the last fallback of course, but >> for me, it opens up different problems - what happens if the key is present >> in both, tasks writes it to metastore, but another task reads it from the >> Secret Backend. >> >> I think it seems that variables are being abused in exactly the way we >> want to prevent the "StateService" to be abused - and shying away from that >> is really like closing our eyes and pretending it's not happening. >> >> So maybe we can make a change AIP with this approach: >> >> 1) Variables -> mostly read-only (for tasks) and used to keep >> configuration shared between workers (but not on a task level). >> 2) StateService (or wherever we call it) where we keep state information >> for specific dag + task + execution_date. >> >> J. >> >> >> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <[email protected]> >> wrote: >> >>> Airflow already provides a mechanism for state persistence: the >>> Variable, and, with caveats and flaws, XCom. >>> >>> I personally persist state to the airflow metastore database for a large >>> percentage of our jobs. They are incremental jobs and it is helpful to >>> keep track of watermark. >>> >>> I think that incremental jobs are probably very very common in airflow >>> implementations. Though probably often times users resort to imperfect >>> vehicles for this such as `execution_date` or xcom. >>> >>> I have a very draftey draft aip that i haven't had enough time to work >>> on, which explores adding explicit support for state persistence to >>> airflow: >>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence. >>> Though I understand it is a controversial idea. (note: The AIP is not >>> ready for primetime.) >>> >>> I am of the mind that being able to persist some state is not a >>> fundamental change to airflow and would just add explicit (and more >>> user-friendly) support for something that is already quite common, and fits >>> fully within the wheelhouse of what airflow exists to do. >>> >>> >>> >>> >>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <[email protected]> wrote: >>> >>>> Furcy, >>>> >>>> To clarify, when I say that Airflow should not be in the business of >>>> keeping state about external systems, I specifically mean it shouldn't be >>>> keeping state to be shared between task instances. I completely understand >>>> that there may be external systems that are harder to work with, and like >>>> in your case require the operator to be able to store some piece of >>>> information to make them idempotent. I just don't think that Airflow should >>>> provide that storage mechanism. >>>> >>>> I would think that most users of Airflow have access to some sort of >>>> cloud storage like S3 (which are really just key-value stores), and it's >>>> easy enough to write your job_id or whatever value you care about to a file >>>> with a prefix computed from the dag_id, task_id, execution_date or whatever >>>> combination of them you care about. Yes it makes your operators more >>>> complex and they have to know about another system, but it keeps that >>>> complexity out of core Airflow. That's the trade off. >>>> >>>> Ash, >>>> I'm not suggesting that XCom be removed from Airflow, and I understand >>>> there are use cases where it makes some things convenient. In your example >>>> though, it would be just as easy for the sensor to write the found object >>>> path as the contents of another file in S3, with a computable prefix based >>>> on the dag/task/execution_date. >>>> >>>> >>>> At its heart XCom is just a key-value store where the keys are limited >>>> to a very specific set of possibilities, and where key-value pairs are >>>> managed in some specific ways. The request here is to add another narrowly >>>> defined set of allowable keys, and as far as I can tell with no extra >>>> management of them. The only real advantage of using the Airflow database >>>> for XCom or any expansion/variation on it is that we know that all >>>> operators have access to the database. >>>> >>>> I'm not an expert but I would wonder how well Postgres or MySQL perform >>>> as high volume key value stores. Does anyone actually use XCom at scale, >>>> and does that extra load on the database impact scheduling and other >>>> performance aspects of Airflow? >>>> >>>> Chris >>>> >>>> >>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <[email protected]> >>>> wrote: >>>> >>>>> Just to touch on one point about XCom, and to re-assure people that >>>>> they, or something very like them are in Airflow for the foreseeable >>>>> future. >>>>> >>>>> As an example of an appropriate use for XCom: Let's say a third party >>>>> delivers you a set of files once a week, but the exact name of the files >>>>> isn't known (by you) in advance. So you write a sensor that polls/checks >>>>> S3 >>>>> for the Objects to appear in our bucket, and the sensor outputs the S3 >>>>> Object path to XCom, that then next processing step then examines to >>>>> process the files. >>>>> >>>>> That sort of use case is not going anywhere. >>>>> >>>>> Cheers, >>>>> -ash >>>>> >>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <[email protected]> wrote: >>>>> >>>>> At the risk of repeating myself (from the previous thread that touched >>>>> on this topic), I don't think Airflow should be in the business of keeping >>>>> state about external systems. Airflow is about authoring and running >>>>> workflows; it's not a messaging tool or a cluster management tool. I'm not >>>>> convinced that the existing XCom functionality should really be a part of >>>>> Airflow, and I certainly don't think it should be expanded upon or new >>>>> variations added. I think storing state is especially risky, if for no >>>>> other reason than the fact that Airflow is not the source of truth about >>>>> those systems. It's very likely that at some times the "state" that >>>>> Airflow >>>>> has saved will diverge from the actual state of the external system. >>>>> Handling that nicely, probably requires a bunch of custom code in the >>>>> operators/hooks anyway, so I don't think it saves anything in terms of >>>>> operator code complexity. Users would be much better served going to the >>>>> source of truth to determine state. If part of the problem is that Livy is >>>>> lacking in features (like being able to query the status of a particular >>>>> job_id) then I think it would be more appropriate to add the needed >>>>> features to that project. Airflow at its core shouldn't be concerned with >>>>> making up for failures of other tools. >>>>> >>>>> Also as can be seen by just this discussion, it's hard to keep these >>>>> extra features from expanding in scope. Jarek proposed something that >>>>> would >>>>> just store a single string, and immediately Furcy wants to expand it to >>>>> store multiple strings. Either way we are really just talking about a >>>>> key-value store, and putting limits on how that key can be structured; the >>>>> key is made up of some predefined set of Airflow entities (for Jarek's >>>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's >>>>> proposal). >>>>> >>>>> I know in the past that I had a situation where I wanted to reuse a >>>>> cluster across multiple data intervals, if one was already running (this >>>>> was before I discovered Airflow so wasn't "execution dates" precisely). I >>>>> can equally see use cases where I might want to share some resource for >>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if >>>>> we >>>>> added this then why limit it to any one of those combinations? But then we >>>>> just have an arbitrary key-value store. If you want to use Airflow for >>>>> that >>>>> then you can use Variables, if you want to use something else then you >>>>> can. >>>>> >>>>> Unless Airflow is doing some extra management of these key-values in >>>>> some way (like it does with clearing out XCom's on reruns), then I see >>>>> absolutely no added benefit. And even with some potential management by >>>>> Airflow I'm still not convinced that Airflow is the right place for it. >>>>> >>>>> Chris >>>>> >>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <[email protected]> wrote: >>>>> >>>>> Thank you Jarek for the detailed explanation, >>>>> >>>>> That's exactly what I wanted to do: write a feature request to >>>>> summarize all those discussions. >>>>> I agree with you that the feature should be marked distinct from the >>>>> XCom feature and that we should not piggyback this feature into XCom. >>>>> >>>>> The crux of the problem, I think is that with XCom you do want the >>>>> task to delete it's xcom on the beginning of the retry. >>>>> Correct me if I'm wrong but one use cases where it was necessary was >>>>> having a task A and a task B that starts immediately after A, and wait >>>>> from >>>>> some 'signal' from A. >>>>> If A and B restart and A doesn't reset it's signal, then B will use >>>>> the signal from A's first try, which is incorrect. >>>>> >>>>> About the 3 solutions you mention: >>>>> >>>>> 1) Providing the job_id from outside. That works indeed. Sadly in my >>>>> use-case Livy's API is poorly designed and only returns a generated >>>>> job_id, >>>>> you can't specify a custom one. >>>>> You can't even find a job by name, I would have to list all the active >>>>> job_ids, and do a GET for each of them to get it's name and find which one >>>>> is the one I want. It's doable but inelegant. >>>>> >>>>> 2) Store the id in an external storage. Of course it would work but it >>>>> requires an external storage. More on that below. >>>>> >>>>> 3) I'm not sure I understand completely what you mean there, but I >>>>> think you mean that the idempotency can be handled by the service you call >>>>> (for instance BigQuery). Indeed that is another solution. If we were using >>>>> Spark with a Hive metastore + locking or the deltalake storage format, we >>>>> could have something to prevent a job that run twice from creating >>>>> duplicates. This is another solution we are considering, but it is >>>>> coslty to change now. >>>>> >>>>> You guess correctly that the feature I was asking for me would be to >>>>> provide some utility to let the users implement solution 2) without >>>>> requiring an external storage. >>>>> I think it would be a QOL improvement for some use cases, just like it >>>>> could be argued that XCom is just a QOL improvement and users could have >>>>> used an external storage themselves. >>>>> The main advantage that it brings is making the custom operators much >>>>> easier to share and reuse across the Apache Airflow community, compared to >>>>> having to set up some external >>>>> storage. >>>>> >>>>> I have seen that some users used the metadata store itself as an >>>>> external storage by adding a new table to the airflow model: >>>>> >>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3ccaerdx9ektwu5urq+pnq_8q-hb-nhtfnq_xwkggpxvo4mhb_...@mail.gmail.com%3e >>>>> >>>>> And others suggested using XCom itself as an external storage by >>>>> storing information with a special task_id: >>>>> https://stackoverflow.com/a/57515143/2087478 >>>>> >>>>> In the discussion thread you provided it was also suggested to use >>>>> Variables to store some persisting information. >>>>> >>>>> These 3 approaches work but feel quite "hacky" and I believe that >>>>> providing such functionality would be good. >>>>> >>>>> Finally, I don't see the point of limiting the functionality to such >>>>> extent, providing a "IdempotencyIdStorage" that only allows you to store a >>>>> string >>>>> will just force people who need to store more than one id for one task >>>>> (for whatever reason) to use some hack again, like storing a json inside >>>>> the storage. >>>>> >>>>> I was more thinking about something quite similar to XCom (I liked the >>>>> XState name suggestion), where the entry would be keyed by "(dag_id, >>>>> task_id, execution_date, key)" >>>>> where "key" can be whatever you want and would be kept across retries. >>>>> >>>>> I have read (quickly) through the "Pandora's Box" thread you linked. >>>>> Indeed it looks like there would be many ways to misuse such feature. >>>>> I do understand the important of idempotency, and it looks like my use >>>>> case is one of the first ever listed where I do need to persist a state >>>>> across retries to make my operator really idempotent. >>>>> >>>>> I'm surprised no one came up with it given how frequent the Spark + >>>>> Airflow combination is (well, the BigQueryOperator was one too but found >>>>> another solution). >>>>> >>>>> Of course we can blame it on Livy for being poorly conceived (unlike >>>>> BigQuery) or we can blame it on Spark for not having a built-in security >>>>> mechanism to prevent double-writes, >>>>> but I think that as the above hacks show, you can't really prevent >>>>> users from shooting themselves in the foot if that's what they really want >>>>> to. >>>>> >>>>> While I do think that making things foolproof is important, I believe >>>>> it's also in Python's philosophy to *not* make things foolproof at >>>>> the detriment of simplicity for the right use cases. >>>>> But I do understand that the use cases are different and >>>>> contradictory: some would require the state to be persisted across >>>>> reschedule and not retries, mine would require the state to be persisted >>>>> across retries and not reschedule. >>>>> >>>>> Maybe the Airflow-y way for that would be to have one task that does >>>>> the submit and an xcom with the job, then one task that check the progress >>>>> of the job, but that feels very cumbersome to double the number of tasks >>>>> just for that. Plus I'm not sure we could make the first task retry if the >>>>> second task fails... >>>>> >>>>> Thanks again, >>>>> >>>>> Furcy >>>>> >>>>> >>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <[email protected]> >>>>> wrote: >>>>> >>>>> I think we've discussed several approaches like that and using Xcom >>>>> name (which for many people would mean "let's just extend XCom table for >>>>> that" is not a very good idea to use it IMHO. I think this is very >>>>> different functionality/logic which we might or might not agree to >>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom >>>>> table behavior might be problematic. >>>>> >>>>> Not sure if you are aware but we had very similar discussion about it >>>>> recently (without clear conclusions but at least you can see what kind of >>>>> issues/problems different people have with this approach) >>>>> >>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E >>>>> >>>>> I am not saying it is impossible to do, but I think it's a matter of >>>>> how we formulate the "use case". It's very tempting to implement a generic >>>>> - intra-task communication mechanism, indeed. But it can very easily lead >>>>> to people abusing it and bypassing the guarantees (idempotency mainly) >>>>> that >>>>> Airflow provides for backfilling and re-running tasks. I thought a bit >>>>> after the latest discussion kind of died out, and I have one possible >>>>> solution to the problem. >>>>> >>>>> Let me explain what I think about it (but others can have different >>>>> opinions of course): >>>>> >>>>> So far the discussion was that there are several ways to achieve what >>>>> you want (and it's really about what entity is providing the "idempotency" >>>>> guarantee: >>>>> >>>>> 1) Similarly as just merged in the BigQuery Insert Job >>>>> https://github.com/apache/airflow/pull/8868/files - you can provide >>>>> job_id from outside. You'd need to work out the job_id naming that works >>>>> in >>>>> your case and make sure that when you re-run your task with the same >>>>> (dag_id, task_id, execution date) you will get the same id. Then the >>>>> "uniqueness" thus idempotency is handled by the logic written in the DAG. >>>>> >>>>> 2) Store the DAG id in some external storage (via one of the hooks - >>>>> where it can be queried in the way that will work for you). Then the >>>>> idempotency is actually handled by the logic in your Operator + some >>>>> external storage. >>>>> >>>>> 3) Query your service and retrieve the JOB ID from it - but you have >>>>> to have a way to query for the job related to your "dag id + task >>>>> + execution_date". Then - the idempotency is actually handling by the >>>>> Service you are using. >>>>> >>>>> In the use case, you describe - this is the only thing you need - >>>>> "idempotency source". I believe you would like to get the case 2) from >>>>> above but without having to use external storage to store the "unique id". >>>>> Something that will let each task in the same dag run to set or retrieve a >>>>> unique value for that particular task. One value should be enough - >>>>> assuming that each operator/task works on one external data "source". >>>>> >>>>> My current thinking is: >>>>> >>>>> Why don't we provide such a dedicated, idempotency service inside >>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage" >>>>> class with two methods: >>>>> >>>>> * .set(id: str) and >>>>> * .get() -> str >>>>> >>>>> And the data stored there should be a string keyed by "dag_id, >>>>> task_id, execution_date)" - available also via Jinja templating. There is >>>>> no intra-task communication, here, very little possibility of abuse and it >>>>> seems to solve the major pain point where you have to provide your own >>>>> storage to get the idempotency if your service does not provide one or you >>>>> do not want to delegate it to the DAG writer. >>>>> >>>>> J. >>>>> >>>>> >>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <[email protected]> wrote: >>>>> >>>>> The use case I'm referring to is that you can't use xcom to let a task >>>>> read information from it's past attempts, because when a task starts it's >>>>> xcom is automatically deleted. >>>>> >>>>> My specific use case is that we have a custom LivyOperator that calls >>>>> Livy to start batch Spark Jobs. >>>>> When you start a batch job Livy returns a job_id >>>>> Sometimes our operator can fail for one reason or another (for >>>>> instance if Livy is unreachable for a while) >>>>> When the task retries, it calls Livy again, which start the same spark >>>>> job, but the problem is that the spark job from the first attempt can >>>>> still >>>>> be running, >>>>> and then we have a batch job that runs twice simultaneously and >>>>> creates duplicates in the output. >>>>> >>>>> What we tried to do is getting the job_id from the first try, to check >>>>> if the job is still running, and wait for it to complete if it is. >>>>> >>>>> We tried using xcom to let the task send a message to itself (to it's >>>>> next try) but xcom is meant for "inter-task communication" only so this >>>>> doesn't work and is not intended to work. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <[email protected]> wrote: >>>>> >>>>> Hi Furcy, >>>>> >>>>> Can you give a concrete example of what you mean by intra-task xcom? >>>>> Depending your use case this may already be possible. >>>>> >>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <[email protected]> wrote: >>>>> >>>>> Hello, >>>>> >>>>> I would like to open a feature request for Airflow to support >>>>> "intra-task xcom". >>>>> >>>>> It seems that there are several distinct use cases for it already >>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket. >>>>> >>>>> I wanted to summarize links to the use cases and past attempts, >>>>> and the recommended approach (which apparently would be to create >>>>> a distinct feature from xcom to support this, it could be calle >>>>> intra-com or self-com ?) >>>>> >>>>> Do you know if such ticket already exists? I couldn't find one. >>>>> Also I can't create any ticket due to some obscure bug (see my other >>>>> email). >>>>> >>>>> Thanks, >>>>> >>>>> Furcy >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Jarek Potiuk >>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer >>>>> >>>>> M: +48 660 796 129 <+48660796129> >>>>> [image: Polidea] <https://www.polidea.com/> >>>>> >>>>> >> >> -- >> >> Jarek Potiuk >> Polidea <https://www.polidea.com/> | Principal Software Engineer >> >> M: +48 660 796 129 <+48660796129> >> [image: Polidea] <https://www.polidea.com/> >> >> -- Jarek Potiuk Polidea <https://www.polidea.com/> | Principal Software Engineer M: +48 660 796 129 <+48660796129> [image: Polidea] <https://www.polidea.com/>
