I think no-one wants to remove xcom really :)

On Tue, Jun 2, 2020 at 10:32 AM Michael Lutz <[email protected]>
wrote:

> please do not remove existing xcom functionality.   I am using it
> extensively.   If you implement something more robust or elegant, that
> would be fine.  I feel that a more robust state management system would be
> helpful it feels like an area of improvement.
>
> On Tue, Jun 2, 2020, 3:05 AM Jarek Potiuk <[email protected]>
> wrote:
>
>> I think this subject came so often, that I also change my mind slowly in
>> favor of making an explicit state persistence "service".
>>
>> Whether it's only one key or more, it's secondary, I think but if users
>> are already using Variables to keep state for tasks - this is a clear sign
>> that we miss a crucial feature and our users are abusing Airflow already in
>> the way we try to prevent by not introducing "State service".
>>
>> With the recent SecretBackend implementation where Variables might be
>> kept in a Secret backend - not only MetaStore - potentially you might have
>> no write access to the backend. There is even no "write" support in the
>> current "MetastoreBackend" implementation for writing variables. So we
>> already have configurations where if we try to write variables and read it
>> elsewhere might not work - as far as I can see. You can set several
>> backends of course and the Metastore as the last fallback of course, but
>> for me, it opens up different problems - what happens if the key is present
>> in both, tasks writes it to metastore, but another task reads it from the
>> Secret Backend.
>>
>> I think it seems that variables are being abused in exactly the way we
>> want to prevent the "StateService" to be abused - and shying away from that
>> is really like closing our eyes and pretending it's not happening.
>>
>> So maybe we can make a change AIP with this approach:
>>
>> 1) Variables -> mostly read-only (for tasks)  and used to keep
>> configuration shared between workers (but not on a task level).
>> 2) StateService (or wherever we call it) where we keep state information
>> for specific dag + task + execution_date.
>>
>> J.
>>
>>
>> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <[email protected]>
>> wrote:
>>
>>> Airflow already provides a mechanism for state persistence: the
>>> Variable, and, with caveats and flaws, XCom.
>>>
>>> I personally persist state to the airflow metastore database for a large
>>> percentage of our jobs.  They are incremental jobs and it is helpful to
>>> keep track of watermark.
>>>
>>> I think that incremental jobs are probably very very common in airflow
>>> implementations.  Though probably often times users resort to imperfect
>>> vehicles for this such as `execution_date` or xcom.
>>>
>>> I have a very draftey draft aip that i haven't had enough time to work
>>> on, which explores adding explicit support for state persistence to
>>> airflow:
>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
>>> Though I understand it is a controversial idea.  (note: The AIP is not
>>> ready for primetime.)
>>>
>>> I am of the mind that being able to persist some state is not a
>>> fundamental change to airflow and would just add explicit (and more
>>> user-friendly) support for something that is already quite common, and fits
>>> fully within the wheelhouse of what airflow exists to do.
>>>
>>>
>>>
>>>
>>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <[email protected]> wrote:
>>>
>>>> Furcy,
>>>>
>>>> To clarify, when I say that Airflow should not be in the business of
>>>> keeping state about external systems, I specifically mean it shouldn't be
>>>> keeping state to be shared between task instances. I completely understand
>>>> that there may be external systems that are harder to work with, and like
>>>> in your case require the operator to be able to store some piece of
>>>> information to make them idempotent. I just don't think that Airflow should
>>>> provide that storage mechanism.
>>>>
>>>> I would think that most users of Airflow have access to some sort of
>>>> cloud storage like S3 (which are really just key-value stores), and it's
>>>> easy enough to write your job_id or whatever value you care about to a file
>>>> with a prefix computed from the dag_id, task_id, execution_date or whatever
>>>> combination of them you care about. Yes it makes your operators more
>>>> complex and they have to know about another system, but it keeps that
>>>> complexity out of core Airflow. That's the trade off.
>>>>
>>>> Ash,
>>>> I'm not suggesting that XCom be removed from Airflow, and I understand
>>>> there are use cases where it makes some things convenient. In your example
>>>> though, it would be just as easy for the sensor to write the found object
>>>> path as the contents of another file in S3, with a computable prefix based
>>>> on the dag/task/execution_date.
>>>>
>>>>
>>>> At its heart XCom is just a key-value store where the keys are limited
>>>> to a very specific set of possibilities, and where key-value pairs are
>>>> managed in some specific ways. The request here is to add another narrowly
>>>> defined set of allowable keys, and as far as I can tell with no extra
>>>> management of them. The only real advantage of using the Airflow database
>>>> for XCom or any expansion/variation on it is that we know that all
>>>> operators have access to the database.
>>>>
>>>> I'm not an expert but I would wonder how well Postgres or MySQL perform
>>>> as high volume key value stores. Does anyone actually use XCom at scale,
>>>> and does that extra load on the database impact scheduling and other
>>>> performance aspects of Airflow?
>>>>
>>>> Chris
>>>>
>>>>
>>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <[email protected]>
>>>> wrote:
>>>>
>>>>> Just to touch on one point about XCom, and to re-assure people that
>>>>> they, or something very like them are in Airflow for the foreseeable 
>>>>> future.
>>>>>
>>>>> As an example of an appropriate use for XCom: Let's say a third party
>>>>> delivers you a set of files once a week, but the exact name of the files
>>>>> isn't known (by you) in advance. So you write a sensor that polls/checks 
>>>>> S3
>>>>> for the Objects to appear in our bucket, and the sensor outputs the S3
>>>>> Object path to XCom, that then next processing step then examines to
>>>>> process the files.
>>>>>
>>>>> That sort of use case is not going anywhere.
>>>>>
>>>>> Cheers,
>>>>> -ash
>>>>>
>>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <[email protected]> wrote:
>>>>>
>>>>> At the risk of repeating myself (from the previous thread that touched
>>>>> on this topic), I don't think Airflow should be in the business of keeping
>>>>> state about external systems. Airflow is about authoring and running
>>>>> workflows; it's not a messaging tool or a cluster management tool. I'm not
>>>>> convinced that the existing XCom functionality should really be a part of
>>>>> Airflow, and I certainly don't think it should be expanded upon or new
>>>>> variations added. I think storing state is especially risky, if for no
>>>>> other reason than the fact that Airflow is not the source of truth about
>>>>> those systems. It's very likely that at some times the "state" that 
>>>>> Airflow
>>>>> has saved will diverge from the actual state of the external system.
>>>>> Handling that nicely, probably requires a bunch of custom code in the
>>>>> operators/hooks anyway, so I don't think it saves anything in terms of
>>>>> operator code complexity. Users would be much better served going to the
>>>>> source of truth to determine state. If part of the problem is that Livy is
>>>>> lacking in features (like being able to query the status of a particular
>>>>> job_id) then I think it would be more appropriate to add the needed
>>>>> features to that project. Airflow at its core shouldn't be concerned with
>>>>> making up for failures of other tools.
>>>>>
>>>>> Also as can be seen by just this discussion, it's hard to keep these
>>>>> extra features from expanding in scope. Jarek proposed something that 
>>>>> would
>>>>> just store a single string, and immediately Furcy wants to expand it to
>>>>> store multiple strings. Either way we are really just talking about a
>>>>> key-value store, and putting limits on how that key can be structured; the
>>>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>>>>> proposal).
>>>>>
>>>>> I know in the past that I had a situation where I wanted to reuse a
>>>>> cluster across multiple data intervals, if one was already running (this
>>>>> was before I discovered Airflow so wasn't "execution dates" precisely). I
>>>>> can equally see use cases where I might want to share some resource for
>>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if 
>>>>> we
>>>>> added this then why limit it to any one of those combinations? But then we
>>>>> just have an arbitrary key-value store. If you want to use Airflow for 
>>>>> that
>>>>> then you can use Variables, if you want to use something else then you 
>>>>> can.
>>>>>
>>>>> Unless Airflow is doing some extra management of these key-values in
>>>>> some way (like it does with clearing out XCom's on reruns), then I see
>>>>> absolutely no added benefit. And even with some potential management by
>>>>> Airflow I'm still not convinced that Airflow is the right place for it.
>>>>>
>>>>> Chris
>>>>>
>>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <[email protected]> wrote:
>>>>>
>>>>> Thank you Jarek for the detailed explanation,
>>>>>
>>>>> That's exactly what I wanted to do: write a feature request to
>>>>> summarize all those discussions.
>>>>> I agree with you that the feature should be marked distinct from the
>>>>> XCom feature and that we should not piggyback this feature into XCom.
>>>>>
>>>>> The crux of the problem, I think is that with XCom you do want the
>>>>> task to delete it's xcom on the beginning of the retry.
>>>>> Correct me if I'm wrong but one use cases where it was necessary was
>>>>> having a task A and a task B that starts immediately after A, and wait 
>>>>> from
>>>>> some 'signal' from A.
>>>>> If A and B restart and A doesn't reset it's signal, then B will use
>>>>> the signal from A's first try, which is incorrect.
>>>>>
>>>>> About the 3 solutions you mention:
>>>>>
>>>>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>>>>> use-case Livy's API is poorly designed and only returns a generated 
>>>>> job_id,
>>>>> you can't specify a custom one.
>>>>> You can't even find a job by name, I would have to list all the active
>>>>> job_ids, and do a GET for each of them to get it's name and find which one
>>>>> is the one I want. It's doable but inelegant.
>>>>>
>>>>> 2) Store the id in an external storage. Of course it would work but it
>>>>> requires an external storage. More on that below.
>>>>>
>>>>> 3) I'm not sure I understand completely what you mean there, but I
>>>>> think you mean that the idempotency can be handled by the service you call
>>>>> (for instance BigQuery). Indeed that is another solution. If we were using
>>>>> Spark with a Hive metastore + locking or the deltalake storage format, we
>>>>> could have something to prevent a job that run twice from creating
>>>>> duplicates. This is another solution we are considering, but it is
>>>>> coslty to change now.
>>>>>
>>>>> You guess correctly that the feature I was asking for me would be to
>>>>> provide some utility to let the users implement solution 2) without
>>>>> requiring an external storage.
>>>>> I think it would be a QOL improvement for some use cases, just like it
>>>>> could be argued that XCom is just a QOL improvement and users could have
>>>>> used an external storage themselves.
>>>>> The main advantage that it brings is making the custom operators much
>>>>> easier to share and reuse across the Apache Airflow community, compared to
>>>>> having to set up some external
>>>>> storage.
>>>>>
>>>>> I have seen that some users used the metadata store itself as an
>>>>> external storage by adding a new table to the airflow model:
>>>>>
>>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3ccaerdx9ektwu5urq+pnq_8q-hb-nhtfnq_xwkggpxvo4mhb_...@mail.gmail.com%3e
>>>>>
>>>>> And others suggested using XCom itself as an external storage by
>>>>> storing information with a special task_id:
>>>>> https://stackoverflow.com/a/57515143/2087478
>>>>>
>>>>> In the discussion thread you provided it was also suggested to use
>>>>> Variables to store some persisting information.
>>>>>
>>>>> These 3 approaches work but feel quite "hacky" and I believe that
>>>>> providing such functionality would be good.
>>>>>
>>>>> Finally, I don't see the point of limiting the functionality to such
>>>>> extent, providing a "IdempotencyIdStorage" that only allows you to store a
>>>>> string
>>>>> will just force people who need to store more than one id for one task
>>>>> (for whatever reason) to use some hack again, like storing a json inside
>>>>> the storage.
>>>>>
>>>>> I was more thinking about something quite similar to XCom (I liked the
>>>>> XState name suggestion), where the entry would be keyed by "(dag_id,
>>>>> task_id, execution_date, key)"
>>>>> where "key" can be whatever you want and would be kept across retries.
>>>>>
>>>>> I have read (quickly) through the "Pandora's Box" thread you linked.
>>>>> Indeed it looks like there would be many ways to misuse such feature.
>>>>> I do understand the important of idempotency, and it looks like my use
>>>>> case is one of the first ever listed where I do need to persist a state
>>>>> across retries to make my operator really idempotent.
>>>>>
>>>>> I'm surprised no one came up with it given how frequent the Spark +
>>>>> Airflow combination is (well, the BigQueryOperator was one too but found
>>>>> another solution).
>>>>>
>>>>> Of course we can blame it on Livy for being poorly conceived (unlike
>>>>> BigQuery) or we can blame it on Spark for not having a built-in security
>>>>> mechanism to prevent double-writes,
>>>>> but I think that as the above hacks show, you can't really prevent
>>>>> users from shooting themselves in the foot if that's what they really want
>>>>> to.
>>>>>
>>>>> While I do think that making things foolproof is important, I believe
>>>>> it's also in Python's philosophy to *not* make things foolproof at
>>>>> the detriment of simplicity for the right use cases.
>>>>> But I do understand that the use cases are different and
>>>>> contradictory: some would require the state to be persisted across
>>>>> reschedule and not retries, mine would require the state to be persisted
>>>>> across retries and not reschedule.
>>>>>
>>>>> Maybe the Airflow-y way for that would be to have one task that does
>>>>> the submit and an xcom with the job, then one task that check the progress
>>>>> of the job, but that feels very cumbersome to double the number of tasks
>>>>> just for that. Plus I'm not sure we could make the first task retry if the
>>>>> second task fails...
>>>>>
>>>>> Thanks again,
>>>>>
>>>>> Furcy
>>>>>
>>>>>
>>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <[email protected]>
>>>>> wrote:
>>>>>
>>>>> I think we've discussed several approaches like that and using Xcom
>>>>> name (which for many people would mean "let's just extend XCom table for
>>>>> that" is not a very good idea to use it IMHO. I think this is very
>>>>> different functionality/logic which we might or might not agree to
>>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>>>> table behavior might be problematic.
>>>>>
>>>>> Not sure if you are aware but we had very similar discussion about it
>>>>> recently (without clear conclusions but at least you can see what kind of
>>>>> issues/problems different people have with this approach)
>>>>>
>>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>>>>
>>>>> I am not saying it is impossible to do, but I think it's a matter of
>>>>> how we formulate the "use case". It's very tempting to implement a generic
>>>>> - intra-task communication mechanism, indeed. But it can very easily lead
>>>>> to people abusing it and bypassing the guarantees (idempotency mainly) 
>>>>> that
>>>>> Airflow provides for backfilling and re-running tasks. I thought a bit
>>>>> after the latest discussion kind of died out, and I have one possible
>>>>> solution to the problem.
>>>>>
>>>>> Let me explain what I think about it (but others can have different
>>>>> opinions of course):
>>>>>
>>>>> So far the discussion was that there are several ways to achieve what
>>>>> you want (and it's really about what entity is providing the "idempotency"
>>>>> guarantee:
>>>>>
>>>>> 1) Similarly as just merged in the BigQuery Insert Job
>>>>> https://github.com/apache/airflow/pull/8868/files - you can provide
>>>>> job_id from outside. You'd need to work out the job_id naming that works 
>>>>> in
>>>>> your case and make sure that when you re-run your task with the same
>>>>> (dag_id, task_id, execution date) you will get the same id. Then the
>>>>> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>>>>>
>>>>> 2) Store the DAG id in some external storage (via one of the hooks -
>>>>> where it can be queried in the way that will work for you). Then the
>>>>> idempotency is actually handled by the logic in your Operator + some
>>>>> external storage.
>>>>>
>>>>> 3) Query your service and retrieve the JOB ID from it - but you have
>>>>> to have a way to query for the job related to your "dag id  + task
>>>>> + execution_date". Then - the idempotency is actually handling by the
>>>>> Service you are using.
>>>>>
>>>>> In the use case, you describe - this is the only thing you need -
>>>>> "idempotency source". I believe you would like to get the case 2) from
>>>>> above but without having to use external storage to store the "unique id".
>>>>> Something that will let each task in the same dag run to set or retrieve a
>>>>> unique value for that particular task. One value should be enough -
>>>>> assuming that each operator/task works on one external data "source".
>>>>>
>>>>> My current thinking is:
>>>>>
>>>>> Why don't we provide such a dedicated, idempotency service inside
>>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>>>> class with two methods:
>>>>>
>>>>>   * .set(id: str) and
>>>>>   * .get() -> str
>>>>>
>>>>> And the data stored there should be a string keyed by "dag_id,
>>>>> task_id, execution_date)" - available also via Jinja templating. There is
>>>>> no intra-task communication, here, very little possibility of abuse and it
>>>>> seems to solve the major pain point where you have to provide your own
>>>>> storage to get the idempotency if your service does not provide one or you
>>>>> do not want to delegate it to the DAG writer.
>>>>>
>>>>> J.
>>>>>
>>>>>
>>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <[email protected]> wrote:
>>>>>
>>>>> The use case I'm referring to is that you can't use xcom to let a task
>>>>> read information from it's past attempts, because when a task starts it's
>>>>> xcom is automatically deleted.
>>>>>
>>>>> My specific use case is that we have a custom LivyOperator that calls
>>>>> Livy to start batch Spark Jobs.
>>>>> When you start a batch job Livy returns a job_id
>>>>> Sometimes our operator can fail for one reason or another (for
>>>>> instance if Livy is unreachable for a while)
>>>>> When the task retries, it calls Livy again, which start the same spark
>>>>> job, but the problem is that the spark job from the first attempt can 
>>>>> still
>>>>> be running,
>>>>> and then we have a batch job that runs twice simultaneously and
>>>>> creates duplicates in the output.
>>>>>
>>>>> What we tried to do is getting the job_id from the first try, to check
>>>>> if the job is still running, and wait for it to complete if it is.
>>>>>
>>>>> We tried using xcom to let the task send a message to itself (to it's
>>>>> next try) but xcom is meant for "inter-task communication" only so this
>>>>> doesn't work and is not intended to work.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <[email protected]> wrote:
>>>>>
>>>>> Hi Furcy,
>>>>>
>>>>> Can you give a concrete example of what you mean by intra-task xcom?
>>>>> Depending your use case this may already be possible.
>>>>>
>>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <[email protected]> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I would like to open a feature request for Airflow to support
>>>>> "intra-task xcom".
>>>>>
>>>>> It seems that there are several distinct use cases for it already
>>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>>>
>>>>> I wanted to summarize links to the use cases and past attempts,
>>>>> and the recommended approach (which apparently would be to create
>>>>> a distinct feature from xcom to support this, it could be calle
>>>>> intra-com or self-com ?)
>>>>>
>>>>> Do you know if such ticket already exists? I couldn't find one.
>>>>> Also I can't create any ticket due to some obscure bug (see my other
>>>>> email).
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Furcy
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Jarek Potiuk
>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>>
>>>>> M: +48 660 796 129 <+48660796129>
>>>>> [image: Polidea] <https://www.polidea.com/>
>>>>>
>>>>>
>>
>> --
>>
>> Jarek Potiuk
>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>
>> M: +48 660 796 129 <+48660796129>
>> [image: Polidea] <https://www.polidea.com/>
>>
>>

-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>

Reply via email to