I definitely feel we can support this uses-cases by improving XCom. The
concept of XCom was to allow sharing messages & state between tasks.

Here is the first line from the docs about Xcom:

XComs let tasks exchange messages, allowing more nuanced forms of control
and shared state. The name is an abbreviation of “cross-communication”.

I read the AIP (
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence
) from @Daniel Standish <[email protected]> , the "namespacing" of this
state would be a good feature,
XCom already allows that with "dag_id, task_id" key. *Solution 1* in the
AIP would solve the issue without much impact and whilst
maintaining backwards-compatiblilty.

I am against the idea of using Secrets Backend for storing "State". Storing
state for some kind of persistence should be "short-lived" and temporary.

The "writers" & "readers" of both (Secrets & State) are different.
Generally, Sysadmins / Teamleads are responsible for managing secrets
(writing, rotating, auditing) etc whereas for State it is written via
Airflow Workers  and would (or should) be short-lived and you don't care by
auditing or rotating the value in "State".

The only problem that I can see in the current XCom implementation is 1)
the use of execution_date and the fact that 2) XCom are cleared at the
start.

One of the issue we already want to address in Airflow is to remove the
hard-requirement of "execution_date" for DagRun and TaskInstance. This
would also help in fixing (1) above.

(2) can be solved by a flag as mentioned in the AIP.

Regards,
Kaxil


On Tue, Jun 2, 2020 at 8:05 AM Jarek Potiuk <[email protected]>
wrote:

> I think this subject came so often, that I also change my mind slowly in
> favor of making an explicit state persistence "service".
>
> Whether it's only one key or more, it's secondary, I think but if users
> are already using Variables to keep state for tasks - this is a clear sign
> that we miss a crucial feature and our users are abusing Airflow already in
> the way we try to prevent by not introducing "State service".
>
> With the recent SecretBackend implementation where Variables might be kept
> in a Secret backend - not only MetaStore - potentially you might have no
> write access to the backend. There is even no "write" support in the
> current "MetastoreBackend" implementation for writing variables. So we
> already have configurations where if we try to write variables and read it
> elsewhere might not work - as far as I can see. You can set several
> backends of course and the Metastore as the last fallback of course, but
> for me, it opens up different problems - what happens if the key is present
> in both, tasks writes it to metastore, but another task reads it from the
> Secret Backend.
>
> I think it seems that variables are being abused in exactly the way we
> want to prevent the "StateService" to be abused - and shying away from that
> is really like closing our eyes and pretending it's not happening.
>
> So maybe we can make a change AIP with this approach:
>
> 1) Variables -> mostly read-only (for tasks)  and used to keep
> configuration shared between workers (but not on a task level).
> 2) StateService (or wherever we call it) where we keep state information
> for specific dag + task + execution_date.
>
> J.
>
>
> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <[email protected]>
> wrote:
>
>> Airflow already provides a mechanism for state persistence: the Variable,
>> and, with caveats and flaws, XCom.
>>
>> I personally persist state to the airflow metastore database for a large
>> percentage of our jobs.  They are incremental jobs and it is helpful to
>> keep track of watermark.
>>
>> I think that incremental jobs are probably very very common in airflow
>> implementations.  Though probably often times users resort to imperfect
>> vehicles for this such as `execution_date` or xcom.
>>
>> I have a very draftey draft aip that i haven't had enough time to work
>> on, which explores adding explicit support for state persistence to
>> airflow:
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
>> Though I understand it is a controversial idea.  (note: The AIP is not
>> ready for primetime.)
>>
>> I am of the mind that being able to persist some state is not a
>> fundamental change to airflow and would just add explicit (and more
>> user-friendly) support for something that is already quite common, and fits
>> fully within the wheelhouse of what airflow exists to do.
>>
>>
>>
>>
>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <[email protected]> wrote:
>>
>>> Furcy,
>>>
>>> To clarify, when I say that Airflow should not be in the business of
>>> keeping state about external systems, I specifically mean it shouldn't be
>>> keeping state to be shared between task instances. I completely understand
>>> that there may be external systems that are harder to work with, and like
>>> in your case require the operator to be able to store some piece of
>>> information to make them idempotent. I just don't think that Airflow should
>>> provide that storage mechanism.
>>>
>>> I would think that most users of Airflow have access to some sort of
>>> cloud storage like S3 (which are really just key-value stores), and it's
>>> easy enough to write your job_id or whatever value you care about to a file
>>> with a prefix computed from the dag_id, task_id, execution_date or whatever
>>> combination of them you care about. Yes it makes your operators more
>>> complex and they have to know about another system, but it keeps that
>>> complexity out of core Airflow. That's the trade off.
>>>
>>> Ash,
>>> I'm not suggesting that XCom be removed from Airflow, and I understand
>>> there are use cases where it makes some things convenient. In your example
>>> though, it would be just as easy for the sensor to write the found object
>>> path as the contents of another file in S3, with a computable prefix based
>>> on the dag/task/execution_date.
>>>
>>>
>>> At its heart XCom is just a key-value store where the keys are limited
>>> to a very specific set of possibilities, and where key-value pairs are
>>> managed in some specific ways. The request here is to add another narrowly
>>> defined set of allowable keys, and as far as I can tell with no extra
>>> management of them. The only real advantage of using the Airflow database
>>> for XCom or any expansion/variation on it is that we know that all
>>> operators have access to the database.
>>>
>>> I'm not an expert but I would wonder how well Postgres or MySQL perform
>>> as high volume key value stores. Does anyone actually use XCom at scale,
>>> and does that extra load on the database impact scheduling and other
>>> performance aspects of Airflow?
>>>
>>> Chris
>>>
>>>
>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <[email protected]> wrote:
>>>
>>>> Just to touch on one point about XCom, and to re-assure people that
>>>> they, or something very like them are in Airflow for the foreseeable 
>>>> future.
>>>>
>>>> As an example of an appropriate use for XCom: Let's say a third party
>>>> delivers you a set of files once a week, but the exact name of the files
>>>> isn't known (by you) in advance. So you write a sensor that polls/checks S3
>>>> for the Objects to appear in our bucket, and the sensor outputs the S3
>>>> Object path to XCom, that then next processing step then examines to
>>>> process the files.
>>>>
>>>> That sort of use case is not going anywhere.
>>>>
>>>> Cheers,
>>>> -ash
>>>>
>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <[email protected]> wrote:
>>>>
>>>> At the risk of repeating myself (from the previous thread that touched
>>>> on this topic), I don't think Airflow should be in the business of keeping
>>>> state about external systems. Airflow is about authoring and running
>>>> workflows; it's not a messaging tool or a cluster management tool. I'm not
>>>> convinced that the existing XCom functionality should really be a part of
>>>> Airflow, and I certainly don't think it should be expanded upon or new
>>>> variations added. I think storing state is especially risky, if for no
>>>> other reason than the fact that Airflow is not the source of truth about
>>>> those systems. It's very likely that at some times the "state" that Airflow
>>>> has saved will diverge from the actual state of the external system.
>>>> Handling that nicely, probably requires a bunch of custom code in the
>>>> operators/hooks anyway, so I don't think it saves anything in terms of
>>>> operator code complexity. Users would be much better served going to the
>>>> source of truth to determine state. If part of the problem is that Livy is
>>>> lacking in features (like being able to query the status of a particular
>>>> job_id) then I think it would be more appropriate to add the needed
>>>> features to that project. Airflow at its core shouldn't be concerned with
>>>> making up for failures of other tools.
>>>>
>>>> Also as can be seen by just this discussion, it's hard to keep these
>>>> extra features from expanding in scope. Jarek proposed something that would
>>>> just store a single string, and immediately Furcy wants to expand it to
>>>> store multiple strings. Either way we are really just talking about a
>>>> key-value store, and putting limits on how that key can be structured; the
>>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>>>> proposal).
>>>>
>>>> I know in the past that I had a situation where I wanted to reuse a
>>>> cluster across multiple data intervals, if one was already running (this
>>>> was before I discovered Airflow so wasn't "execution dates" precisely). I
>>>> can equally see use cases where I might want to share some resource for
>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we
>>>> added this then why limit it to any one of those combinations? But then we
>>>> just have an arbitrary key-value store. If you want to use Airflow for that
>>>> then you can use Variables, if you want to use something else then you can.
>>>>
>>>> Unless Airflow is doing some extra management of these key-values in
>>>> some way (like it does with clearing out XCom's on reruns), then I see
>>>> absolutely no added benefit. And even with some potential management by
>>>> Airflow I'm still not convinced that Airflow is the right place for it.
>>>>
>>>> Chris
>>>>
>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <[email protected]> wrote:
>>>>
>>>> Thank you Jarek for the detailed explanation,
>>>>
>>>> That's exactly what I wanted to do: write a feature request to
>>>> summarize all those discussions.
>>>> I agree with you that the feature should be marked distinct from the
>>>> XCom feature and that we should not piggyback this feature into XCom.
>>>>
>>>> The crux of the problem, I think is that with XCom you do want the task
>>>> to delete it's xcom on the beginning of the retry.
>>>> Correct me if I'm wrong but one use cases where it was necessary was
>>>> having a task A and a task B that starts immediately after A, and wait from
>>>> some 'signal' from A.
>>>> If A and B restart and A doesn't reset it's signal, then B will use the
>>>> signal from A's first try, which is incorrect.
>>>>
>>>> About the 3 solutions you mention:
>>>>
>>>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>>>> use-case Livy's API is poorly designed and only returns a generated job_id,
>>>> you can't specify a custom one.
>>>> You can't even find a job by name, I would have to list all the active
>>>> job_ids, and do a GET for each of them to get it's name and find which one
>>>> is the one I want. It's doable but inelegant.
>>>>
>>>> 2) Store the id in an external storage. Of course it would work but it
>>>> requires an external storage. More on that below.
>>>>
>>>> 3) I'm not sure I understand completely what you mean there, but I
>>>> think you mean that the idempotency can be handled by the service you call
>>>> (for instance BigQuery). Indeed that is another solution. If we were using
>>>> Spark with a Hive metastore + locking or the deltalake storage format, we
>>>> could have something to prevent a job that run twice from creating
>>>> duplicates. This is another solution we are considering, but it is
>>>> coslty to change now.
>>>>
>>>> You guess correctly that the feature I was asking for me would be to
>>>> provide some utility to let the users implement solution 2) without
>>>> requiring an external storage.
>>>> I think it would be a QOL improvement for some use cases, just like it
>>>> could be argued that XCom is just a QOL improvement and users could have
>>>> used an external storage themselves.
>>>> The main advantage that it brings is making the custom operators much
>>>> easier to share and reuse across the Apache Airflow community, compared to
>>>> having to set up some external
>>>> storage.
>>>>
>>>> I have seen that some users used the metadata store itself as an
>>>> external storage by adding a new table to the airflow model:
>>>>
>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3ccaerdx9ektwu5urq+pnq_8q-hb-nhtfnq_xwkggpxvo4mhb_...@mail.gmail.com%3e
>>>>
>>>> And others suggested using XCom itself as an external storage by
>>>> storing information with a special task_id:
>>>> https://stackoverflow.com/a/57515143/2087478
>>>>
>>>> In the discussion thread you provided it was also suggested to use
>>>> Variables to store some persisting information.
>>>>
>>>> These 3 approaches work but feel quite "hacky" and I believe that
>>>> providing such functionality would be good.
>>>>
>>>> Finally, I don't see the point of limiting the functionality to such
>>>> extent, providing a "IdempotencyIdStorage" that only allows you to store a
>>>> string
>>>> will just force people who need to store more than one id for one task
>>>> (for whatever reason) to use some hack again, like storing a json inside
>>>> the storage.
>>>>
>>>> I was more thinking about something quite similar to XCom (I liked the
>>>> XState name suggestion), where the entry would be keyed by "(dag_id,
>>>> task_id, execution_date, key)"
>>>> where "key" can be whatever you want and would be kept across retries.
>>>>
>>>> I have read (quickly) through the "Pandora's Box" thread you linked.
>>>> Indeed it looks like there would be many ways to misuse such feature.
>>>> I do understand the important of idempotency, and it looks like my use
>>>> case is one of the first ever listed where I do need to persist a state
>>>> across retries to make my operator really idempotent.
>>>>
>>>> I'm surprised no one came up with it given how frequent the Spark +
>>>> Airflow combination is (well, the BigQueryOperator was one too but found
>>>> another solution).
>>>>
>>>> Of course we can blame it on Livy for being poorly conceived (unlike
>>>> BigQuery) or we can blame it on Spark for not having a built-in security
>>>> mechanism to prevent double-writes,
>>>> but I think that as the above hacks show, you can't really prevent
>>>> users from shooting themselves in the foot if that's what they really want
>>>> to.
>>>>
>>>> While I do think that making things foolproof is important, I believe
>>>> it's also in Python's philosophy to *not* make things foolproof at the
>>>> detriment of simplicity for the right use cases.
>>>> But I do understand that the use cases are different and contradictory:
>>>> some would require the state to be persisted across reschedule and not
>>>> retries, mine would require the state to be persisted across retries and
>>>> not reschedule.
>>>>
>>>> Maybe the Airflow-y way for that would be to have one task that does
>>>> the submit and an xcom with the job, then one task that check the progress
>>>> of the job, but that feels very cumbersome to double the number of tasks
>>>> just for that. Plus I'm not sure we could make the first task retry if the
>>>> second task fails...
>>>>
>>>> Thanks again,
>>>>
>>>> Furcy
>>>>
>>>>
>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <[email protected]>
>>>> wrote:
>>>>
>>>> I think we've discussed several approaches like that and using Xcom
>>>> name (which for many people would mean "let's just extend XCom table for
>>>> that" is not a very good idea to use it IMHO. I think this is very
>>>> different functionality/logic which we might or might not agree to
>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>>> table behavior might be problematic.
>>>>
>>>> Not sure if you are aware but we had very similar discussion about it
>>>> recently (without clear conclusions but at least you can see what kind of
>>>> issues/problems different people have with this approach)
>>>>
>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>>>
>>>> I am not saying it is impossible to do, but I think it's a matter of
>>>> how we formulate the "use case". It's very tempting to implement a generic
>>>> - intra-task communication mechanism, indeed. But it can very easily lead
>>>> to people abusing it and bypassing the guarantees (idempotency mainly) that
>>>> Airflow provides for backfilling and re-running tasks. I thought a bit
>>>> after the latest discussion kind of died out, and I have one possible
>>>> solution to the problem.
>>>>
>>>> Let me explain what I think about it (but others can have different
>>>> opinions of course):
>>>>
>>>> So far the discussion was that there are several ways to achieve what
>>>> you want (and it's really about what entity is providing the "idempotency"
>>>> guarantee:
>>>>
>>>> 1) Similarly as just merged in the BigQuery Insert Job
>>>> https://github.com/apache/airflow/pull/8868/files - you can provide
>>>> job_id from outside. You'd need to work out the job_id naming that works in
>>>> your case and make sure that when you re-run your task with the same
>>>> (dag_id, task_id, execution date) you will get the same id. Then the
>>>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>>>
>>>> 2) Store the DAG id in some external storage (via one of the hooks -
>>>> where it can be queried in the way that will work for you). Then the
>>>> idempotency is actually handled by the logic in your Operator + some
>>>> external storage.
>>>>
>>>> 3) Query your service and retrieve the JOB ID from it - but you have to
>>>> have a way to query for the job related to your "dag id  + task
>>>> + execution_date". Then - the idempotency is actually handling by the
>>>> Service you are using.
>>>>
>>>> In the use case, you describe - this is the only thing you need -
>>>> "idempotency source". I believe you would like to get the case 2) from
>>>> above but without having to use external storage to store the "unique id".
>>>> Something that will let each task in the same dag run to set or retrieve a
>>>> unique value for that particular task. One value should be enough -
>>>> assuming that each operator/task works on one external data "source".
>>>>
>>>> My current thinking is:
>>>>
>>>> Why don't we provide such a dedicated, idempotency service inside
>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>>> class with two methods:
>>>>
>>>>   * .set(id: str) and
>>>>   * .get() -> str
>>>>
>>>> And the data stored there should be a string keyed by "dag_id, task_id,
>>>> execution_date)" - available also via Jinja templating. There is no
>>>> intra-task communication, here, very little possibility of abuse and it
>>>> seems to solve the major pain point where you have to provide your own
>>>> storage to get the idempotency if your service does not provide one or you
>>>> do not want to delegate it to the DAG writer.
>>>>
>>>> J.
>>>>
>>>>
>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <[email protected]> wrote:
>>>>
>>>> The use case I'm referring to is that you can't use xcom to let a task
>>>> read information from it's past attempts, because when a task starts it's
>>>> xcom is automatically deleted.
>>>>
>>>> My specific use case is that we have a custom LivyOperator that calls
>>>> Livy to start batch Spark Jobs.
>>>> When you start a batch job Livy returns a job_id
>>>> Sometimes our operator can fail for one reason or another (for instance
>>>> if Livy is unreachable for a while)
>>>> When the task retries, it calls Livy again, which start the same spark
>>>> job, but the problem is that the spark job from the first attempt can still
>>>> be running,
>>>> and then we have a batch job that runs twice simultaneously and creates
>>>> duplicates in the output.
>>>>
>>>> What we tried to do is getting the job_id from the first try, to check
>>>> if the job is still running, and wait for it to complete if it is.
>>>>
>>>> We tried using xcom to let the task send a message to itself (to it's
>>>> next try) but xcom is meant for "inter-task communication" only so this
>>>> doesn't work and is not intended to work.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <[email protected]> wrote:
>>>>
>>>> Hi Furcy,
>>>>
>>>> Can you give a concrete example of what you mean by intra-task xcom?
>>>> Depending your use case this may already be possible.
>>>>
>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <[email protected]> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I would like to open a feature request for Airflow to support
>>>> "intra-task xcom".
>>>>
>>>> It seems that there are several distinct use cases for it already
>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>>
>>>> I wanted to summarize links to the use cases and past attempts,
>>>> and the recommended approach (which apparently would be to create
>>>> a distinct feature from xcom to support this, it could be calle
>>>> intra-com or self-com ?)
>>>>
>>>> Do you know if such ticket already exists? I couldn't find one.
>>>> Also I can't create any ticket due to some obscure bug (see my other
>>>> email).
>>>>
>>>> Thanks,
>>>>
>>>> Furcy
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Jarek Potiuk
>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>
>>>> M: +48 660 796 129 <+48660796129>
>>>> [image: Polidea] <https://www.polidea.com/>
>>>>
>>>>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48660796129>
> [image: Polidea] <https://www.polidea.com/>
>
>

Reply via email to