Thank you Jarek for the detailed explanation,

That's exactly what I wanted to do: write a feature request to summarize
all those discussions.
I agree with you that the feature should be marked distinct from the XCom
feature and that we should not piggyback this feature into XCom.

The crux of the problem, I think is that with XCom you do want the task to
delete it's xcom on the beginning of the retry.
Correct me if I'm wrong but one use cases where it was necessary was
having a task A and a task B that starts immediately after A, and wait from
some 'signal' from A.
If A and B restart and A doesn't reset it's signal, then B will use the
signal from A's first try, which is incorrect.

About the 3 solutions you mention:

1) Providing the job_id from outside. That works indeed. Sadly in my
use-case Livy's API is poorly designed and only returns a generated job_id,
you can't specify a custom one.
You can't even find a job by name, I would have to list all the active
job_ids, and do a GET for each of them to get it's name and find which one
is the one I want. It's doable but inelegant.

2) Store the id in an external storage. Of course it would work but it
requires an external storage. More on that below.

3) I'm not sure I understand completely what you mean there, but I think
you mean that the idempotency can be handled by the service you call (for
instance BigQuery). Indeed that is another solution. If we were using Spark
with a Hive metastore + locking or the deltalake storage format, we could
have something to prevent a job that run twice from creating duplicates.
This is another solution we are considering, but it is coslty to change now.

You guess correctly that the feature I was asking for me would be to
provide some utility to let the users implement solution 2) without
requiring an external storage.
I think it would be a QOL improvement for some use cases, just like it
could be argued that XCom is just a QOL improvement and users could have
used an external storage themselves.
The main advantage that it brings is making the custom operators much
easier to share and reuse across the Apache Airflow community, compared to
having to set up some external
storage.

I have seen that some users used the metadata store itself as an external
storage by adding a new table to the airflow model:
http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3ccaerdx9ektwu5urq+pnq_8q-hb-nhtfnq_xwkggpxvo4mhb_...@mail.gmail.com%3e

And others suggested using XCom itself as an external storage by storing
information with a special task_id:
https://stackoverflow.com/a/57515143/2087478

In the discussion thread you provided it was also suggested to use
Variables to store some persisting information.

These 3 approaches work but feel quite "hacky" and I believe that providing
such functionality would be good.

Finally, I don't see the point of limiting the functionality to such
extent, providing a "IdempotencyIdStorage" that only allows you to store a
string
will just force people who need to store more than one id for one task (for
whatever reason) to use some hack again, like storing a json inside the
storage.

I was more thinking about something quite similar to XCom (I liked the
XState name suggestion), where the entry would be keyed by "(dag_id,
task_id, execution_date, key)"
where "key" can be whatever you want and would be kept across retries.

I have read (quickly) through the "Pandora's Box" thread you linked. Indeed
it looks like there would be many ways to misuse such feature.
I do understand the important of idempotency, and it looks like my use case
is one of the first ever listed where I do need to persist a state
across retries to make my operator really idempotent.

I'm surprised no one came up with it given how frequent the Spark + Airflow
combination is (well, the BigQueryOperator was one too but found another
solution).

Of course we can blame it on Livy for being poorly conceived (unlike
BigQuery) or we can blame it on Spark for not having a built-in security
mechanism to prevent double-writes,
but I think that as the above hacks show, you can't really prevent users
from shooting themselves in the foot if that's what they really want to.

While I do think that making things foolproof is important, I believe it's
also in Python's philosophy to *not* make things foolproof at the detriment
of simplicity for the right use cases.
But I do understand that the use cases are different and contradictory:
some would require the state to be persisted across reschedule and not
retries, mine would require the state to be persisted across retries and
not reschedule.

Maybe the Airflow-y way for that would be to have one task that does the
submit and an xcom with the job, then one task that check the progress of
the job, but that feels very cumbersome to double the number of tasks just
for that. Plus I'm not sure we could make the first task retry if the
second task fails...

Thanks again,

Furcy


On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <[email protected]> wrote:

> I think we've discussed several approaches like that and using Xcom
> name (which for many people would mean "let's just extend XCom table for
> that" is not a very good idea to use it IMHO. I think this is very
> different functionality/logic which we might or might not agree to
> implement as a community. Naming it "Xcom" to trying to extend the XCom
> table behavior might be problematic.
>
> Not sure if you are aware but we had very similar discussion about it
> recently (without clear conclusions but at least you can see what kind of
> issues/problems different people have with this approach)
>
> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>
> I am not saying it is impossible to do, but I think it's a matter of how
> we formulate the "use case". It's very tempting to implement a generic -
> intra-task communication mechanism, indeed. But it can very easily lead to
> people abusing it and bypassing the guarantees (idempotency mainly) that
> Airflow provides for backfilling and re-running tasks. I thought a bit
> after the latest discussion kind of died out, and I have one possible
> solution to the problem.
>
> Let me explain what I think about it (but others can have different
> opinions of course):
>
> So far the discussion was that there are several ways to achieve what you
> want (and it's really about what entity is providing the "idempotency"
> guarantee:
>
> 1) Similarly as just merged in the BigQuery Insert Job
> https://github.com/apache/airflow/pull/8868/files - you can provide
> job_id from outside. You'd need to work out the job_id naming that works in
> your case and make sure that when you re-run your task with the same
> (dag_id, task_id, execution date) you will get the same id. Then the
> "uniqueness" thus idempotency is handled by the logic written in the DAG.
>
> 2) Store the DAG id in some external storage (via one of the hooks - where
> it can be queried in the way that will work for you). Then the idempotency
> is actually handled by the logic in your Operator + some external storage.
>
> 3) Query your service and retrieve the JOB ID from it - but you have to
> have a way to query for the job related to your "dag id  + task
> + execution_date". Then - the idempotency is actually handling by the
> Service you are using.
>
> In the use case, you describe - this is the only thing you need -
> "idempotency source". I believe you would like to get the case 2) from
> above but without having to use external storage to store the "unique id".
> Something that will let each task in the same dag run to set or retrieve a
> unique value for that particular task. One value should be enough -
> assuming that each operator/task works on one external data "source".
>
> My current thinking is:
>
> Why don't we provide such a dedicated, idempotency service inside Airflow?
> We already have a DB and we could have an"IdempotencyIdStorage" class with
> two methods:
>
>   * .set(id: str) and
>   * .get() -> str
>
> And the data stored there should be a string keyed by "dag_id, task_id,
> execution_date)" - available also via Jinja templating. There is no
> intra-task communication, here, very little possibility of abuse and it
> seems to solve the major pain point where you have to provide your own
> storage to get the idempotency if your service does not provide one or you
> do not want to delegate it to the DAG writer.
>
> J.
>
>
> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <[email protected]> wrote:
>
>> The use case I'm referring to is that you can't use xcom to let a task
>> read information from it's past attempts, because when a task starts it's
>> xcom is automatically deleted.
>>
>> My specific use case is that we have a custom LivyOperator that calls
>> Livy to start batch Spark Jobs.
>> When you start a batch job Livy returns a job_id
>> Sometimes our operator can fail for one reason or another (for instance
>> if Livy is unreachable for a while)
>> When the task retries, it calls Livy again, which start the same spark
>> job, but the problem is that the spark job from the first attempt can still
>> be running,
>> and then we have a batch job that runs twice simultaneously and creates
>> duplicates in the output.
>>
>> What we tried to do is getting the job_id from the first try, to check if
>> the job is still running, and wait for it to complete if it is.
>>
>> We tried using xcom to let the task send a message to itself (to it's
>> next try) but xcom is meant for "inter-task communication" only so this
>> doesn't work and is not intended to work.
>>
>>
>>
>>
>>
>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <[email protected]> wrote:
>>
>>> Hi Furcy,
>>>
>>> Can you give a concrete example of what you mean by intra-task xcom?
>>> Depending your use case this may already be possible.
>>>
>>> On Jun 1 2020, at 11:45 am, Furcy Pin <[email protected]> wrote:
>>>
>>> Hello,
>>>
>>> I would like to open a feature request for Airflow to support
>>> "intra-task xcom".
>>>
>>> It seems that there are several distinct use cases for it already
>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>
>>> I wanted to summarize links to the use cases and past attempts,
>>> and the recommended approach (which apparently would be to create
>>> a distinct feature from xcom to support this, it could be calle
>>> intra-com or self-com ?)
>>>
>>> Do you know if such ticket already exists? I couldn't find one.
>>> Also I can't create any ticket due to some obscure bug (see my other
>>> email).
>>>
>>> Thanks,
>>>
>>> Furcy
>>>
>>>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48660796129>
> [image: Polidea] <https://www.polidea.com/>
>
>

Reply via email to