Airflow already provides a mechanism for state persistence: the Variable,
and, with caveats and flaws, XCom.

I personally persist state to the airflow metastore database for a large
percentage of our jobs.  They are incremental jobs and it is helpful to
keep track of watermark.

I think that incremental jobs are probably very very common in airflow
implementations.  Though probably often times users resort to imperfect
vehicles for this such as `execution_date` or xcom.

I have a very draftey draft aip that i haven't had enough time to work on,
which explores adding explicit support for state persistence to airflow:
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
Though I understand it is a controversial idea.  (note: The AIP is not
ready for primetime.)

I am of the mind that being able to persist some state is not a fundamental
change to airflow and would just add explicit (and more user-friendly)
support for something that is already quite common, and fits fully within
the wheelhouse of what airflow exists to do.




On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <[email protected]> wrote:

> Furcy,
>
> To clarify, when I say that Airflow should not be in the business of
> keeping state about external systems, I specifically mean it shouldn't be
> keeping state to be shared between task instances. I completely understand
> that there may be external systems that are harder to work with, and like
> in your case require the operator to be able to store some piece of
> information to make them idempotent. I just don't think that Airflow should
> provide that storage mechanism.
>
> I would think that most users of Airflow have access to some sort of cloud
> storage like S3 (which are really just key-value stores), and it's easy
> enough to write your job_id or whatever value you care about to a file with
> a prefix computed from the dag_id, task_id, execution_date or whatever
> combination of them you care about. Yes it makes your operators more
> complex and they have to know about another system, but it keeps that
> complexity out of core Airflow. That's the trade off.
>
> Ash,
> I'm not suggesting that XCom be removed from Airflow, and I understand
> there are use cases where it makes some things convenient. In your example
> though, it would be just as easy for the sensor to write the found object
> path as the contents of another file in S3, with a computable prefix based
> on the dag/task/execution_date.
>
>
> At its heart XCom is just a key-value store where the keys are limited to
> a very specific set of possibilities, and where key-value pairs are managed
> in some specific ways. The request here is to add another narrowly defined
> set of allowable keys, and as far as I can tell with no extra management of
> them. The only real advantage of using the Airflow database for XCom or any
> expansion/variation on it is that we know that all operators have access to
> the database.
>
> I'm not an expert but I would wonder how well Postgres or MySQL perform as
> high volume key value stores. Does anyone actually use XCom at scale, and
> does that extra load on the database impact scheduling and other
> performance aspects of Airflow?
>
> Chris
>
>
> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <[email protected]> wrote:
>
>> Just to touch on one point about XCom, and to re-assure people that they,
>> or something very like them are in Airflow for the foreseeable future.
>>
>> As an example of an appropriate use for XCom: Let's say a third party
>> delivers you a set of files once a week, but the exact name of the files
>> isn't known (by you) in advance. So you write a sensor that polls/checks S3
>> for the Objects to appear in our bucket, and the sensor outputs the S3
>> Object path to XCom, that then next processing step then examines to
>> process the files.
>>
>> That sort of use case is not going anywhere.
>>
>> Cheers,
>> -ash
>>
>> On Jun 1 2020, at 7:37 pm, Chris Palmer <[email protected]> wrote:
>>
>> At the risk of repeating myself (from the previous thread that touched on
>> this topic), I don't think Airflow should be in the business of keeping
>> state about external systems. Airflow is about authoring and running
>> workflows; it's not a messaging tool or a cluster management tool. I'm not
>> convinced that the existing XCom functionality should really be a part of
>> Airflow, and I certainly don't think it should be expanded upon or new
>> variations added. I think storing state is especially risky, if for no
>> other reason than the fact that Airflow is not the source of truth about
>> those systems. It's very likely that at some times the "state" that Airflow
>> has saved will diverge from the actual state of the external system.
>> Handling that nicely, probably requires a bunch of custom code in the
>> operators/hooks anyway, so I don't think it saves anything in terms of
>> operator code complexity. Users would be much better served going to the
>> source of truth to determine state. If part of the problem is that Livy is
>> lacking in features (like being able to query the status of a particular
>> job_id) then I think it would be more appropriate to add the needed
>> features to that project. Airflow at its core shouldn't be concerned with
>> making up for failures of other tools.
>>
>> Also as can be seen by just this discussion, it's hard to keep these
>> extra features from expanding in scope. Jarek proposed something that would
>> just store a single string, and immediately Furcy wants to expand it to
>> store multiple strings. Either way we are really just talking about a
>> key-value store, and putting limits on how that key can be structured; the
>> key is made up of some predefined set of Airflow entities (for Jarek's
>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>> proposal).
>>
>> I know in the past that I had a situation where I wanted to reuse a
>> cluster across multiple data intervals, if one was already running (this
>> was before I discovered Airflow so wasn't "execution dates" precisely). I
>> can equally see use cases where I might want to share some resource for
>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
>> added this then why limit it to any one of those combinations? But then we
>> just have an arbitrary key-value store. If you want to use Airflow for that
>> then you can use Variables, if you want to use something else then you can.
>>
>> Unless Airflow is doing some extra management of these key-values in some
>> way (like it does with clearing out XCom's on reruns), then I see
>> absolutely no added benefit. And even with some potential management by
>> Airflow I'm still not convinced that Airflow is the right place for it.
>>
>> Chris
>>
>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <[email protected]> wrote:
>>
>> Thank you Jarek for the detailed explanation,
>>
>> That's exactly what I wanted to do: write a feature request to summarize
>> all those discussions.
>> I agree with you that the feature should be marked distinct from the XCom
>> feature and that we should not piggyback this feature into XCom.
>>
>> The crux of the problem, I think is that with XCom you do want the task
>> to delete it's xcom on the beginning of the retry.
>> Correct me if I'm wrong but one use cases where it was necessary was
>> having a task A and a task B that starts immediately after A, and wait from
>> some 'signal' from A.
>> If A and B restart and A doesn't reset it's signal, then B will use the
>> signal from A's first try, which is incorrect.
>>
>> About the 3 solutions you mention:
>>
>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>> use-case Livy's API is poorly designed and only returns a generated job_id,
>> you can't specify a custom one.
>> You can't even find a job by name, I would have to list all the active
>> job_ids, and do a GET for each of them to get it's name and find which one
>> is the one I want. It's doable but inelegant.
>>
>> 2) Store the id in an external storage. Of course it would work but it
>> requires an external storage. More on that below.
>>
>> 3) I'm not sure I understand completely what you mean there, but I think
>> you mean that the idempotency can be handled by the service you call (for
>> instance BigQuery). Indeed that is another solution. If we were using Spark
>> with a Hive metastore + locking or the deltalake storage format, we could
>> have something to prevent a job that run twice from creating duplicates.
>> This is another solution we are considering, but it is coslty to change now.
>>
>> You guess correctly that the feature I was asking for me would be to
>> provide some utility to let the users implement solution 2) without
>> requiring an external storage.
>> I think it would be a QOL improvement for some use cases, just like it
>> could be argued that XCom is just a QOL improvement and users could have
>> used an external storage themselves.
>> The main advantage that it brings is making the custom operators much
>> easier to share and reuse across the Apache Airflow community, compared to
>> having to set up some external
>> storage.
>>
>> I have seen that some users used the metadata store itself as an external
>> storage by adding a new table to the airflow model:
>>
>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3ccaerdx9ektwu5urq+pnq_8q-hb-nhtfnq_xwkggpxvo4mhb_...@mail.gmail.com%3e
>>
>> And others suggested using XCom itself as an external storage by storing
>> information with a special task_id:
>> https://stackoverflow.com/a/57515143/2087478
>>
>> In the discussion thread you provided it was also suggested to use
>> Variables to store some persisting information.
>>
>> These 3 approaches work but feel quite "hacky" and I believe that
>> providing such functionality would be good.
>>
>> Finally, I don't see the point of limiting the functionality to such
>> extent, providing a "IdempotencyIdStorage" that only allows you to store a
>> string
>> will just force people who need to store more than one id for one task
>> (for whatever reason) to use some hack again, like storing a json inside
>> the storage.
>>
>> I was more thinking about something quite similar to XCom (I liked the
>> XState name suggestion), where the entry would be keyed by "(dag_id,
>> task_id, execution_date, key)"
>> where "key" can be whatever you want and would be kept across retries.
>>
>> I have read (quickly) through the "Pandora's Box" thread you linked.
>> Indeed it looks like there would be many ways to misuse such feature.
>> I do understand the important of idempotency, and it looks like my use
>> case is one of the first ever listed where I do need to persist a state
>> across retries to make my operator really idempotent.
>>
>> I'm surprised no one came up with it given how frequent the Spark +
>> Airflow combination is (well, the BigQueryOperator was one too but found
>> another solution).
>>
>> Of course we can blame it on Livy for being poorly conceived (unlike
>> BigQuery) or we can blame it on Spark for not having a built-in security
>> mechanism to prevent double-writes,
>> but I think that as the above hacks show, you can't really prevent users
>> from shooting themselves in the foot if that's what they really want to.
>>
>> While I do think that making things foolproof is important, I believe
>> it's also in Python's philosophy to *not* make things foolproof at the
>> detriment of simplicity for the right use cases.
>> But I do understand that the use cases are different and contradictory:
>> some would require the state to be persisted across reschedule and not
>> retries, mine would require the state to be persisted across retries and
>> not reschedule.
>>
>> Maybe the Airflow-y way for that would be to have one task that does the
>> submit and an xcom with the job, then one task that check the progress of
>> the job, but that feels very cumbersome to double the number of tasks just
>> for that. Plus I'm not sure we could make the first task retry if the
>> second task fails...
>>
>> Thanks again,
>>
>> Furcy
>>
>>
>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <[email protected]>
>> wrote:
>>
>> I think we've discussed several approaches like that and using Xcom
>> name (which for many people would mean "let's just extend XCom table for
>> that" is not a very good idea to use it IMHO. I think this is very
>> different functionality/logic which we might or might not agree to
>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>> table behavior might be problematic.
>>
>> Not sure if you are aware but we had very similar discussion about it
>> recently (without clear conclusions but at least you can see what kind of
>> issues/problems different people have with this approach)
>>
>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>
>> I am not saying it is impossible to do, but I think it's a matter of how
>> we formulate the "use case". It's very tempting to implement a generic -
>> intra-task communication mechanism, indeed. But it can very easily lead to
>> people abusing it and bypassing the guarantees (idempotency mainly) that
>> Airflow provides for backfilling and re-running tasks. I thought a bit
>> after the latest discussion kind of died out, and I have one possible
>> solution to the problem.
>>
>> Let me explain what I think about it (but others can have different
>> opinions of course):
>>
>> So far the discussion was that there are several ways to achieve what you
>> want (and it's really about what entity is providing the "idempotency"
>> guarantee:
>>
>> 1) Similarly as just merged in the BigQuery Insert Job
>> https://github.com/apache/airflow/pull/8868/files - you can provide
>> job_id from outside. You'd need to work out the job_id naming that works in
>> your case and make sure that when you re-run your task with the same
>> (dag_id, task_id, execution date) you will get the same id. Then the
>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>
>> 2) Store the DAG id in some external storage (via one of the hooks -
>> where it can be queried in the way that will work for you). Then the
>> idempotency is actually handled by the logic in your Operator + some
>> external storage.
>>
>> 3) Query your service and retrieve the JOB ID from it - but you have to
>> have a way to query for the job related to your "dag id  + task
>> + execution_date". Then - the idempotency is actually handling by the
>> Service you are using.
>>
>> In the use case, you describe - this is the only thing you need -
>> "idempotency source". I believe you would like to get the case 2) from
>> above but without having to use external storage to store the "unique id".
>> Something that will let each task in the same dag run to set or retrieve a
>> unique value for that particular task. One value should be enough -
>> assuming that each operator/task works on one external data "source".
>>
>> My current thinking is:
>>
>> Why don't we provide such a dedicated, idempotency service inside
>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>> class with two methods:
>>
>>   * .set(id: str) and
>>   * .get() -> str
>>
>> And the data stored there should be a string keyed by "dag_id, task_id,
>> execution_date)" - available also via Jinja templating. There is no
>> intra-task communication, here, very little possibility of abuse and it
>> seems to solve the major pain point where you have to provide your own
>> storage to get the idempotency if your service does not provide one or you
>> do not want to delegate it to the DAG writer.
>>
>> J.
>>
>>
>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <[email protected]> wrote:
>>
>> The use case I'm referring to is that you can't use xcom to let a task
>> read information from it's past attempts, because when a task starts it's
>> xcom is automatically deleted.
>>
>> My specific use case is that we have a custom LivyOperator that calls
>> Livy to start batch Spark Jobs.
>> When you start a batch job Livy returns a job_id
>> Sometimes our operator can fail for one reason or another (for instance
>> if Livy is unreachable for a while)
>> When the task retries, it calls Livy again, which start the same spark
>> job, but the problem is that the spark job from the first attempt can still
>> be running,
>> and then we have a batch job that runs twice simultaneously and creates
>> duplicates in the output.
>>
>> What we tried to do is getting the job_id from the first try, to check if
>> the job is still running, and wait for it to complete if it is.
>>
>> We tried using xcom to let the task send a message to itself (to it's
>> next try) but xcom is meant for "inter-task communication" only so this
>> doesn't work and is not intended to work.
>>
>>
>>
>>
>>
>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <[email protected]> wrote:
>>
>> Hi Furcy,
>>
>> Can you give a concrete example of what you mean by intra-task xcom?
>> Depending your use case this may already be possible.
>>
>> On Jun 1 2020, at 11:45 am, Furcy Pin <[email protected]> wrote:
>>
>> Hello,
>>
>> I would like to open a feature request for Airflow to support "intra-task
>> xcom".
>>
>> It seems that there are several distinct use cases for it already
>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>
>> I wanted to summarize links to the use cases and past attempts,
>> and the recommended approach (which apparently would be to create
>> a distinct feature from xcom to support this, it could be calle intra-com
>> or self-com ?)
>>
>> Do you know if such ticket already exists? I couldn't find one.
>> Also I can't create any ticket due to some obscure bug (see my other
>> email).
>>
>> Thanks,
>>
>> Furcy
>>
>>
>>
>> --
>>
>> Jarek Potiuk
>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>
>> M: +48 660 796 129 <+48660796129>
>> [image: Polidea] <https://www.polidea.com/>
>>
>>

Reply via email to