Thank you Chris for your answer. I agree with you that if there is something to fix, it's more on the Livy side. I also did not considered the Variables feature, mostly because I saw it as a feature to pass mutable configuration to the tasks and I did not know tasks could set variables too.
I guess we may try using this as a workaround, thank you for the suggestion. If I may, I would like to tamper your argument that "Airflow should not be in the business of keeping state about external systems". While I agree with you on that, I would tend to say that while "checking the state of external systems" is not Airflow's business, I believe it is the exact business of Airflow operators. As you said, operators should rather check the source of truth, but after doing this operators will be keeping the state of the external system in RAM. The question then is how should failure be handled by the operator ? For instance my LivyOperator regularly poke Livy to know the state of the running job. If there is a network interruption between the operator and the external system, what should happen? Should the operator handle the retries itself ? If yes for how long ? What if the operator itself fails for another completely unexpected reason ? If the system isn't itself designed to be idempotent (like a RDMS), isn't it the Operator's job to behave in an idempotent way ? Or should we design a whole system in-between the operator and the system to handle the idempotency ? Or just use another system ? If we say that it's the Operator's job to be idempotent despite the system, then to be able to do that it might need to know some information from it's previous executions (to perform cleanup or check a state), and since any state stored outside the external system might diverge, it must also be able to check that it did not. But if an Operator does exactly that I don't see why it should be considered a bad thing to let it use some kind of provided storage. Also, sadly, it is often easier to write a temporary workaround for a recurring production issue in your custom operator rather than in the external system that you don't own (and some system aren't open source like Livy). Thanks again for the explanation, Furcy On Mon, 1 Jun 2020 at 20:38, Chris Palmer <[email protected]> wrote: > At the risk of repeating myself (from the previous thread that touched on > this topic), I don't think Airflow should be in the business of keeping > state about external systems. Airflow is about authoring and running > workflows; it's not a messaging tool or a cluster management tool. I'm not > convinced that the existing XCom functionality should really be a part of > Airflow, and I certainly don't think it should be expanded upon or new > variations added. I think storing state is especially risky, if for no > other reason than the fact that Airflow is not the source of truth about > those systems. It's very likely that at some times the "state" that Airflow > has saved will diverge from the actual state of the external system. > Handling that nicely, probably requires a bunch of custom code in the > operators/hooks anyway, so I don't think it saves anything in terms of > operator code complexity. Users would be much better served going to the > source of truth to determine state. If part of the problem is that Livy is > lacking in features (like being able to query the status of a particular > job_id) then I think it would be more appropriate to add the needed > features to that project. Airflow at its core shouldn't be concerned with > making up for failures of other tools. > > Also as can be seen by just this discussion, it's hard to keep these extra > features from expanding in scope. Jarek proposed something that would just > store a single string, and immediately Furcy wants to expand it to store > multiple strings. Either way we are really just talking about a key-value > store, and putting limits on how that key can be structured; the key is > made up of some predefined set of Airflow entities (for Jarek's proposal) > or some arbitrary key along with those Airflow entities (Furcy's proposal). > > I know in the past that I had a situation where I wanted to reuse a > cluster across multiple data intervals, if one was already running (this > was before I discovered Airflow so wasn't "execution dates" precisely). I > can equally see use cases where I might want to share some resource for > multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we > added this then why limit it to any one of those combinations? But then we > just have an arbitrary key-value store. If you want to use Airflow for that > then you can use Variables, if you want to use something else then you can. > > Unless Airflow is doing some extra management of these key-values in some > way (like it does with clearing out XCom's on reruns), then I see > absolutely no added benefit. And even with some potential management by > Airflow I'm still not convinced that Airflow is the right place for it. > > Chris > > On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <[email protected]> wrote: > >> Thank you Jarek for the detailed explanation, >> >> That's exactly what I wanted to do: write a feature request to summarize >> all those discussions. >> I agree with you that the feature should be marked distinct from the XCom >> feature and that we should not piggyback this feature into XCom. >> >> The crux of the problem, I think is that with XCom you do want the task >> to delete it's xcom on the beginning of the retry. >> Correct me if I'm wrong but one use cases where it was necessary was >> having a task A and a task B that starts immediately after A, and wait from >> some 'signal' from A. >> If A and B restart and A doesn't reset it's signal, then B will use the >> signal from A's first try, which is incorrect. >> >> About the 3 solutions you mention: >> >> 1) Providing the job_id from outside. That works indeed. Sadly in my >> use-case Livy's API is poorly designed and only returns a generated job_id, >> you can't specify a custom one. >> You can't even find a job by name, I would have to list all the active >> job_ids, and do a GET for each of them to get it's name and find which one >> is the one I want. It's doable but inelegant. >> >> 2) Store the id in an external storage. Of course it would work but it >> requires an external storage. More on that below. >> >> 3) I'm not sure I understand completely what you mean there, but I think >> you mean that the idempotency can be handled by the service you call (for >> instance BigQuery). Indeed that is another solution. If we were using Spark >> with a Hive metastore + locking or the deltalake storage format, we could >> have something to prevent a job that run twice from creating duplicates. >> This is another solution we are considering, but it is coslty to change now. >> >> You guess correctly that the feature I was asking for me would be to >> provide some utility to let the users implement solution 2) without >> requiring an external storage. >> I think it would be a QOL improvement for some use cases, just like it >> could be argued that XCom is just a QOL improvement and users could have >> used an external storage themselves. >> The main advantage that it brings is making the custom operators much >> easier to share and reuse across the Apache Airflow community, compared to >> having to set up some external >> storage. >> >> I have seen that some users used the metadata store itself as an external >> storage by adding a new table to the airflow model: >> >> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3ccaerdx9ektwu5urq+pnq_8q-hb-nhtfnq_xwkggpxvo4mhb_...@mail.gmail.com%3e >> >> And others suggested using XCom itself as an external storage by storing >> information with a special task_id: >> https://stackoverflow.com/a/57515143/2087478 >> >> In the discussion thread you provided it was also suggested to use >> Variables to store some persisting information. >> >> These 3 approaches work but feel quite "hacky" and I believe that >> providing such functionality would be good. >> >> Finally, I don't see the point of limiting the functionality to such >> extent, providing a "IdempotencyIdStorage" that only allows you to store a >> string >> will just force people who need to store more than one id for one task >> (for whatever reason) to use some hack again, like storing a json inside >> the storage. >> >> I was more thinking about something quite similar to XCom (I liked the >> XState name suggestion), where the entry would be keyed by "(dag_id, >> task_id, execution_date, key)" >> where "key" can be whatever you want and would be kept across retries. >> >> I have read (quickly) through the "Pandora's Box" thread you linked. >> Indeed it looks like there would be many ways to misuse such feature. >> I do understand the important of idempotency, and it looks like my use >> case is one of the first ever listed where I do need to persist a state >> across retries to make my operator really idempotent. >> >> I'm surprised no one came up with it given how frequent the Spark + >> Airflow combination is (well, the BigQueryOperator was one too but found >> another solution). >> >> Of course we can blame it on Livy for being poorly conceived (unlike >> BigQuery) or we can blame it on Spark for not having a built-in security >> mechanism to prevent double-writes, >> but I think that as the above hacks show, you can't really prevent users >> from shooting themselves in the foot if that's what they really want to. >> >> While I do think that making things foolproof is important, I believe >> it's also in Python's philosophy to *not* make things foolproof at the >> detriment of simplicity for the right use cases. >> But I do understand that the use cases are different and contradictory: >> some would require the state to be persisted across reschedule and not >> retries, mine would require the state to be persisted across retries and >> not reschedule. >> >> Maybe the Airflow-y way for that would be to have one task that does the >> submit and an xcom with the job, then one task that check the progress of >> the job, but that feels very cumbersome to double the number of tasks just >> for that. Plus I'm not sure we could make the first task retry if the >> second task fails... >> >> Thanks again, >> >> Furcy >> >> >> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <[email protected]> >> wrote: >> >>> I think we've discussed several approaches like that and using Xcom >>> name (which for many people would mean "let's just extend XCom table for >>> that" is not a very good idea to use it IMHO. I think this is very >>> different functionality/logic which we might or might not agree to >>> implement as a community. Naming it "Xcom" to trying to extend the XCom >>> table behavior might be problematic. >>> >>> Not sure if you are aware but we had very similar discussion about it >>> recently (without clear conclusions but at least you can see what kind of >>> issues/problems different people have with this approach) >>> >>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E >>> >>> I am not saying it is impossible to do, but I think it's a matter of how >>> we formulate the "use case". It's very tempting to implement a generic - >>> intra-task communication mechanism, indeed. But it can very easily lead to >>> people abusing it and bypassing the guarantees (idempotency mainly) that >>> Airflow provides for backfilling and re-running tasks. I thought a bit >>> after the latest discussion kind of died out, and I have one possible >>> solution to the problem. >>> >>> Let me explain what I think about it (but others can have different >>> opinions of course): >>> >>> So far the discussion was that there are several ways to achieve what >>> you want (and it's really about what entity is providing the "idempotency" >>> guarantee: >>> >>> 1) Similarly as just merged in the BigQuery Insert Job >>> https://github.com/apache/airflow/pull/8868/files - you can provide >>> job_id from outside. You'd need to work out the job_id naming that works in >>> your case and make sure that when you re-run your task with the same >>> (dag_id, task_id, execution date) you will get the same id. Then the >>> "uniqueness" thus idempotency is handled by the logic written in the DAG. >>> >>> 2) Store the DAG id in some external storage (via one of the hooks - >>> where it can be queried in the way that will work for you). Then the >>> idempotency is actually handled by the logic in your Operator + some >>> external storage. >>> >>> 3) Query your service and retrieve the JOB ID from it - but you have to >>> have a way to query for the job related to your "dag id + task >>> + execution_date". Then - the idempotency is actually handling by the >>> Service you are using. >>> >>> In the use case, you describe - this is the only thing you need - >>> "idempotency source". I believe you would like to get the case 2) from >>> above but without having to use external storage to store the "unique id". >>> Something that will let each task in the same dag run to set or retrieve a >>> unique value for that particular task. One value should be enough - >>> assuming that each operator/task works on one external data "source". >>> >>> My current thinking is: >>> >>> Why don't we provide such a dedicated, idempotency service inside >>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage" >>> class with two methods: >>> >>> * .set(id: str) and >>> * .get() -> str >>> >>> And the data stored there should be a string keyed by "dag_id, task_id, >>> execution_date)" - available also via Jinja templating. There is no >>> intra-task communication, here, very little possibility of abuse and it >>> seems to solve the major pain point where you have to provide your own >>> storage to get the idempotency if your service does not provide one or you >>> do not want to delegate it to the DAG writer. >>> >>> J. >>> >>> >>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <[email protected]> wrote: >>> >>>> The use case I'm referring to is that you can't use xcom to let a task >>>> read information from it's past attempts, because when a task starts it's >>>> xcom is automatically deleted. >>>> >>>> My specific use case is that we have a custom LivyOperator that calls >>>> Livy to start batch Spark Jobs. >>>> When you start a batch job Livy returns a job_id >>>> Sometimes our operator can fail for one reason or another (for instance >>>> if Livy is unreachable for a while) >>>> When the task retries, it calls Livy again, which start the same spark >>>> job, but the problem is that the spark job from the first attempt can still >>>> be running, >>>> and then we have a batch job that runs twice simultaneously and creates >>>> duplicates in the output. >>>> >>>> What we tried to do is getting the job_id from the first try, to check >>>> if the job is still running, and wait for it to complete if it is. >>>> >>>> We tried using xcom to let the task send a message to itself (to it's >>>> next try) but xcom is meant for "inter-task communication" only so this >>>> doesn't work and is not intended to work. >>>> >>>> >>>> >>>> >>>> >>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <[email protected]> wrote: >>>> >>>>> Hi Furcy, >>>>> >>>>> Can you give a concrete example of what you mean by intra-task xcom? >>>>> Depending your use case this may already be possible. >>>>> >>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <[email protected]> wrote: >>>>> >>>>> Hello, >>>>> >>>>> I would like to open a feature request for Airflow to support >>>>> "intra-task xcom". >>>>> >>>>> It seems that there are several distinct use cases for it already >>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket. >>>>> >>>>> I wanted to summarize links to the use cases and past attempts, >>>>> and the recommended approach (which apparently would be to create >>>>> a distinct feature from xcom to support this, it could be calle >>>>> intra-com or self-com ?) >>>>> >>>>> Do you know if such ticket already exists? I couldn't find one. >>>>> Also I can't create any ticket due to some obscure bug (see my other >>>>> email). >>>>> >>>>> Thanks, >>>>> >>>>> Furcy >>>>> >>>>> >>> >>> -- >>> >>> Jarek Potiuk >>> Polidea <https://www.polidea.com/> | Principal Software Engineer >>> >>> M: +48 660 796 129 <+48660796129> >>> [image: Polidea] <https://www.polidea.com/> >>> >>>
