Just opening the Pandora's box :).

I personally think we should not keep state between retries. It opens up
possibilities to whole host of problems and edge cases, and allows people
to solve some approaches in non-airflow'y ways - losing some important
properties (mainly idempotency). Tasks in Airflow should be idempotent and
stateless from the operator's author point of view).

I think there is quite a big conceptual difference between keeping the
reschedule state (it's just optimising of execution of the same task) and
keeping state between retries.

Right now when you write your operator it's simple - no state to handle.
XComs (and everything else) is cleared when task is re-run.
With Poke reschedule proposal - the only thing you can do is to
save/retrieve a single ID attached to the current task instance. This id
will not be cleared on reschedule, but it will be cleared on retry.

If we introduce saving state on retries, it opens up a lot of questions -
should we keep all retries? or just one? What data should we keep - should
we allow more structured data? What guidelines should people follow when
writing their operators ? And it's a totally different feature that should
be discussed separately.

J.


On Fri, Jan 10, 2020 at 5:15 PM Shaw, Damian P. <
damian.sha...@credit-suisse.com> wrote:

> I just wanted to add a related use case is task retries, there are lots of
> scenarios where keeping state between the retries as well as the
> reschedules would be extremely helpful, so as long as whatever the solution
> is isn't overly narrow I'd be extremely appreciative.
>
> Damian
>
> -----Original Message-----
> From: Jarek Potiuk <jarek.pot...@polidea.com>
> Sent: Friday, January 10, 2020 11:05
> To: dev@airflow.apache.org
> Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke
> Rescheduling in Operators
>
> Also another point to discuss here. As an original author of the idea of
> using prefix in xcom, I think after the discussions I changed my mind. I
> think that simply adding a field to an existing table (TaskReschedule?)
> where we could keep all the data that need to be persisted, seems to be a
> good idea. We do not impact performance too much (the table is already
> queried) , we do not add too much complexity and we do not try to introduce
> a generic "state" storage - this would be a solution dedicated to only
> handle rescheduling.
>
> On Fri, Jan 10, 2020 at 1:44 PM Driesprong, Fokko <fo...@driesprong.frl>
> wrote:
>
> > The repoke logic as it is now implemented with the sensor, is able to
> > recover from an unexpected crash. After each poke, it will just go to
> > sleep. If the process crashes in between, it might become a zombie
> > task in the end, but this is also taken care of by the scheduler. In
> > this case, the scheduler thinks the task is still running, but in
> reality, it crashes.
> > There is a timeout that will reset the execution. Hopefully, this
> > doesn't happen often, and should only occur when something is off (for
> > example a machine crashed, or a network partition, etc). HTH
> >
> > Personally I don't like duplicating the same table for such a similar
> > use case. But that's a design choice I guess.
> >
> > If we go for the async executor, the above might be different. I think
> > it is good to not include this in the discussion.
> >
> > Cheers, Fokko
> >
> >
> > Op do 9 jan. 2020 om 19:33 schreef Darren Weber <
> > dweber.consult...@gmail.com
> > >:
> >
> > > Not sure whether to add to this email thread or the google-doc (not
> > > sure
> > if
> > > that google-doc is just a meeting-notes or if it should evolve into
> > > a
> > spec
> > > :grin:).
> > >
> > > Maybe a stupid suggestion, but here it is anyway:
> > >
> > > XCom - communication between elements of a DAG
> > >
> > > XState - key/value store available for each element of a DAG
> > >
> > > Clearly separate the behavior of a stateful resource (XState) from
> > > one
> > that
> > > is not intended to be stateful (XCom), if that makes any sense?
> > (Creating
> > > a new XState feature is similar to a new db-table, I guess.)
> > >
> > > Just to explain what I understand about the goals of how Airflow
> > > should behave when it has some ability for an operator to reschedule
> > > pokes and
> > the
> > > scope of the changes.  In the big picture, it's important that
> > > Airflow
> > can
> > > resurrect a DAG on a restart when some elements of the DAG contain
> > > operators/sensors that are dependent on external cloud operations (e.g.
> > AWS
> > > Batch).  This is feasible when Airflow can persist any unique job-ID
> > > defined by the external job provider (e.g. AWS Batch "jobId") and
> > > any related identifiers for the job (e.g. AWS Batch infrastructure
> > > ARNs for batch queue/compute-env etc and all of this detail is
> > > captured in the AwsBatchOperator already).  Assuming Airflow runs a
> > > DAG that spins up
> > 100's
> > > or 1000's of such external jobs and persists the external "jobId",
> > > when Airflow crashes or is stopped for upgrades etc. and restarted,
> > > the operators that submitted the jobs should be able to try to check
> > > on the state of those previously submitted jobs.  If the jobs are
> > > still running
> > on
> > > the external provider (e.g. AWS Batch), it should be able to resume
> > > monitoring (poking) the job status without re-submitting a duplicate
> > > job (also any failure to poke a job should have some level of
> > > poke-retry behavior that does not immediately fail the Airflow task
> > > that results in somehow re-submitting the same job that is already
> > > running).  So, in that context, what is the scope of the
> > > "reshedule-poke" changes - do they
> > simply
> > > release a worker and so long as Airflow is "up" (has not crashed),
> > > the reschedule can resume poking, but if Airflow crashes, the whole
> > > thing starts over again because the state of the task is not
> > > resilient to
> > Airflow
> > > crashing?  Or, does the work on the "reschedule-poke" also provide
> > > resilience when Airflow crashes?  If the goal is to be resilient to
> > Airflow
> > > crashes, what is required for the "reschedule-poke" work to
> > > accomplish
> > that
> > > goal, if it doesn't already?  (Would the architecture for Airflow
> > > resilience be out-of-scope in this context because it involves more
> > > complexity, like a Kafka cluster?)
> > >
> > > -- Darren
> > >
> > >
> > >
> > > On Wed, Jan 8, 2020 at 2:24 AM Jarek Potiuk
> > > <jarek.pot...@polidea.com>
> > > wrote:
> > >
> > > > Commented as well. I think we are really going in a good direction!
> > > >
> > > > On Wed, Jan 8, 2020 at 9:22 AM Driesprong, Fokko
> > > > <fo...@driesprong.frl
> > >
> > > > wrote:
> > > >
> > > > > Thanks Jacob for building the document. I think we're on the
> > > > > right
> > > track.
> > > > > I've added some comments and clarification to the document, to
> > validate
> > > > > we're looking in the same direction. Would love to get more
> > > > > people's opinion on this.
> > > > >
> > > > > Cheers, Fokko
> > > > >
> > > > > Op wo 8 jan. 2020 om 03:31 schreef Jacob Ferriero
> > > > > <jferri...@google.com.invalid>:
> > > > >
> > > > > > Image not working on dev list here is link to the github
> > > > > > review
> > > comment
> > > > > > containing said image:
> > > > > > https://github.com/apache/airflow/pull/6370#issuecomment-54658
> > > > > > 2724
> > .
> > > > > >
> > > > > > On Tue, Jan 7, 2020 at 5:40 PM Jacob Ferriero <
> > jferri...@google.com>
> > > > > > wrote:
> > > > > >
> > > > > >> Hello Dev List,
> > > > > >>
> > > > > >> The inspiration for this is to allow operators to start a
> > > > > >> long
> > > running
> > > > > >> task on an external system and reschedule pokes for
> > > > > >> completion
> > (e.g
> > > > > spark
> > > > > >> job on dataproc), instead of blocking a worker (sketched out
> > > > > >> in
> > > #6210
> > > > > >> <https://github.com/apache/airflow/pull/6210>) to allow
> > > > > >> freeing
> > up
> > > of
> > > > > >> slots between pokes. To do this requires supporting a method
> > > > > >> for
> > > > storing
> > > > > >> task state between reschedules.
> > > > > >> It's worth noting that a task would maintain state only
> > > > > >> during reschedules but clear state on retries. In this way
> > > > > >> the task is
> > > > > idempotent
> > > > > >> before reaching a terminal state [SUCCES, FAIL, UP_FOR_RETRY].
> > This
> > > > > brings
> > > > > >> up a question of the scope of commitment to idempotency of
> > > operators.
> > > > > If it
> > > > > >> is deemed acceptable for reschedules to maintain some state,
> > > > > >> then
> > we
> > > > can
> > > > > >> free up workers between pokes.
> > > > > >>
> > > > > >> Because this is very similar to the purpose of XCom it's been
> > > > postulated
> > > > > >> that we should support this behavior in XCom rather than
> > > > > >> provide a
> > > new
> > > > > >> model in the db for TaskState. (Though discussion here on
> > > > > >> which is
> > > > more
> > > > > >> appropriate is more than welcome.)
> > > > > >>
> > > > > >> I'd like to put forward a proposal to resurrect the reverted
> > > > > >> #6370 <https://github.com/apache/airflow/pull/6370> in order
> > > > > >> to
> > provide a
> > > > > >> modification to the lifetime of XComs under certain conditions.
> > The
> > > > > diagram
> > > > > >> below helps illustrate the change originally proposed in #6370.
> > > There
> > > > > was
> > > > > >> concern about changing existing behavior (potentially
> > > > > >> breaking)
> > and
> > > > the
> > > > > >> fact that this makes operators stateful. Per the review
> > > > > >> comments
> > and
> > > > an
> > > > > >> informal discussion (meetings notes <
> > > > >
> > > >
> > >
> > https://docs.google.com/document/d/1uuNCPAcwnn0smcDUJPDFMMjrK-z6Z0oses
> > PG7jVZ3oU/edit#
> > > > > >
> > > > > >> and #sig-async-operators) I'd like to modify the approach
> > > > > >> #6370 to
> > > > only
> > > > > >> skip clearing of XCom if the Xom key is prefixed with
> > > > > >> `airflow.models.xcom.DO_NOT_CLEAR_PREFIX = "_STATEFUL_"` or
> > similar.
> > > > > >>
> > > > > >> [image: image.png]
> > > > > >> --
> > > > > >>
> > > > > >> *Jacob Ferriero*
> > > > > >>
> > > > > >> Strategic Cloud Engineer: Data Engineering
> > > > > >>
> > > > > >> jferri...@google.com
> > > > > >>
> > > > > >> 617-714-2509 <(617)%20714-2509>
> > > > > >>
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > *Jacob Ferriero*
> > > > > >
> > > > > > Strategic Cloud Engineer: Data Engineering
> > > > > >
> > > > > > jferri...@google.com
> > > > > >
> > > > > > 617-714-2509
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Jarek Potiuk
> > > > Polidea <https://www.polidea.com/> | Principal Software Engineer
> > > >
> > > > M: +48 660 796 129 <+48660796129>
> > > > [image: Polidea] <https://www.polidea.com/>
> > > >
> > >
> > >
> > > --
> > > Darren L. Weber, Ph.D.
> > > http://psdlw.users.sourceforge.net/
> > > http://psdlw.users.sourceforge.net/wordpress/
> > >
> >
>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48660796129>
> [image: Polidea] <https://www.polidea.com/>
>
>
>
> ===============================================================================
>
> Please access the attached hyperlink for an important electronic
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
> ===============================================================================
>
>


-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>

Reply via email to