Also another point to discuss here. As an original author of the idea of
using prefix in xcom, I think after the discussions I changed my mind. I
think that simply adding a field to an existing table (TaskReschedule?)
where we could keep all the data that need to be persisted, seems to be a
good idea. We do not impact performance too much (the table is
already queried) , we do not add too much complexity and we do not try to
introduce a generic "state" storage - this would be a solution dedicated to
only handle rescheduling.

On Fri, Jan 10, 2020 at 1:44 PM Driesprong, Fokko <fo...@driesprong.frl>
wrote:

> The repoke logic as it is now implemented with the sensor, is able to
> recover from an unexpected crash. After each poke, it will just go to
> sleep. If the process crashes in between, it might become a zombie task in
> the end, but this is also taken care of by the scheduler. In this case, the
> scheduler thinks the task is still running, but in reality, it crashes.
> There is a timeout that will reset the execution. Hopefully, this doesn't
> happen often, and should only occur when something is off (for example a
> machine crashed, or a network partition, etc). HTH
>
> Personally I don't like duplicating the same table for such a similar use
> case. But that's a design choice I guess.
>
> If we go for the async executor, the above might be different. I think it
> is good to not include this in the discussion.
>
> Cheers, Fokko
>
>
> Op do 9 jan. 2020 om 19:33 schreef Darren Weber <
> dweber.consult...@gmail.com
> >:
>
> > Not sure whether to add to this email thread or the google-doc (not sure
> if
> > that google-doc is just a meeting-notes or if it should evolve into a
> spec
> > :grin:).
> >
> > Maybe a stupid suggestion, but here it is anyway:
> >
> > XCom - communication between elements of a DAG
> >
> > XState - key/value store available for each element of a DAG
> >
> > Clearly separate the behavior of a stateful resource (XState) from one
> that
> > is not intended to be stateful (XCom), if that makes any sense?
> (Creating
> > a new XState feature is similar to a new db-table, I guess.)
> >
> > Just to explain what I understand about the goals of how Airflow should
> > behave when it has some ability for an operator to reschedule pokes and
> the
> > scope of the changes.  In the big picture, it's important that Airflow
> can
> > resurrect a DAG on a restart when some elements of the DAG contain
> > operators/sensors that are dependent on external cloud operations (e.g.
> AWS
> > Batch).  This is feasible when Airflow can persist any unique job-ID
> > defined by the external job provider (e.g. AWS Batch "jobId") and any
> > related identifiers for the job (e.g. AWS Batch infrastructure ARNs for
> > batch queue/compute-env etc and all of this detail is captured in the
> > AwsBatchOperator already).  Assuming Airflow runs a DAG that spins up
> 100's
> > or 1000's of such external jobs and persists the external "jobId", when
> > Airflow crashes or is stopped for upgrades etc. and restarted, the
> > operators that submitted the jobs should be able to try to check on the
> > state of those previously submitted jobs.  If the jobs are still running
> on
> > the external provider (e.g. AWS Batch), it should be able to resume
> > monitoring (poking) the job status without re-submitting a duplicate job
> > (also any failure to poke a job should have some level of poke-retry
> > behavior that does not immediately fail the Airflow task that results in
> > somehow re-submitting the same job that is already running).  So, in that
> > context, what is the scope of the "reshedule-poke" changes - do they
> simply
> > release a worker and so long as Airflow is "up" (has not crashed), the
> > reschedule can resume poking, but if Airflow crashes, the whole thing
> > starts over again because the state of the task is not resilient to
> Airflow
> > crashing?  Or, does the work on the "reschedule-poke" also provide
> > resilience when Airflow crashes?  If the goal is to be resilient to
> Airflow
> > crashes, what is required for the "reschedule-poke" work to accomplish
> that
> > goal, if it doesn't already?  (Would the architecture for Airflow
> > resilience be out-of-scope in this context because it involves more
> > complexity, like a Kafka cluster?)
> >
> > -- Darren
> >
> >
> >
> > On Wed, Jan 8, 2020 at 2:24 AM Jarek Potiuk <jarek.pot...@polidea.com>
> > wrote:
> >
> > > Commented as well. I think we are really going in a good direction!
> > >
> > > On Wed, Jan 8, 2020 at 9:22 AM Driesprong, Fokko <fo...@driesprong.frl
> >
> > > wrote:
> > >
> > > > Thanks Jacob for building the document. I think we're on the right
> > track.
> > > > I've added some comments and clarification to the document, to
> validate
> > > > we're looking in the same direction. Would love to get more people's
> > > > opinion on this.
> > > >
> > > > Cheers, Fokko
> > > >
> > > > Op wo 8 jan. 2020 om 03:31 schreef Jacob Ferriero
> > > > <jferri...@google.com.invalid>:
> > > >
> > > > > Image not working on dev list here is link to the github review
> > comment
> > > > > containing said image:
> > > > > https://github.com/apache/airflow/pull/6370#issuecomment-546582724
> .
> > > > >
> > > > > On Tue, Jan 7, 2020 at 5:40 PM Jacob Ferriero <
> jferri...@google.com>
> > > > > wrote:
> > > > >
> > > > >> Hello Dev List,
> > > > >>
> > > > >> The inspiration for this is to allow operators to start a long
> > running
> > > > >> task on an external system and reschedule pokes for completion
> (e.g
> > > > spark
> > > > >> job on dataproc), instead of blocking a worker (sketched out in
> > #6210
> > > > >> <https://github.com/apache/airflow/pull/6210>) to allow freeing
> up
> > of
> > > > >> slots between pokes. To do this requires supporting a method for
> > > storing
> > > > >> task state between reschedules.
> > > > >> It's worth noting that a task would maintain state only during
> > > > >> reschedules but clear state on retries. In this way the task is
> > > > idempotent
> > > > >> before reaching a terminal state [SUCCES, FAIL, UP_FOR_RETRY].
> This
> > > > brings
> > > > >> up a question of the scope of commitment to idempotency of
> > operators.
> > > > If it
> > > > >> is deemed acceptable for reschedules to maintain some state, then
> we
> > > can
> > > > >> free up workers between pokes.
> > > > >>
> > > > >> Because this is very similar to the purpose of XCom it's been
> > > postulated
> > > > >> that we should support this behavior in XCom rather than provide a
> > new
> > > > >> model in the db for TaskState. (Though discussion here on which is
> > > more
> > > > >> appropriate is more than welcome.)
> > > > >>
> > > > >> I'd like to put forward a proposal to resurrect the reverted #6370
> > > > >> <https://github.com/apache/airflow/pull/6370> in order to
> provide a
> > > > >> modification to the lifetime of XComs under certain conditions.
> The
> > > > diagram
> > > > >> below helps illustrate the change originally proposed in #6370.
> > There
> > > > was
> > > > >> concern about changing existing behavior (potentially breaking)
> and
> > > the
> > > > >> fact that this makes operators stateful. Per the review comments
> and
> > > an
> > > > >> informal discussion (meetings notes
> > > > >> <
> > > >
> > >
> >
> https://docs.google.com/document/d/1uuNCPAcwnn0smcDUJPDFMMjrK-z6Z0osesPG7jVZ3oU/edit#
> > > > >
> > > > >> and #sig-async-operators) I'd like to modify the approach #6370 to
> > > only
> > > > >> skip clearing of XCom if the Xom key is prefixed with
> > > > >> `airflow.models.xcom.DO_NOT_CLEAR_PREFIX = "_STATEFUL_"` or
> similar.
> > > > >>
> > > > >> [image: image.png]
> > > > >> --
> > > > >>
> > > > >> *Jacob Ferriero*
> > > > >>
> > > > >> Strategic Cloud Engineer: Data Engineering
> > > > >>
> > > > >> jferri...@google.com
> > > > >>
> > > > >> 617-714-2509 <(617)%20714-2509>
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > *Jacob Ferriero*
> > > > >
> > > > > Strategic Cloud Engineer: Data Engineering
> > > > >
> > > > > jferri...@google.com
> > > > >
> > > > > 617-714-2509
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Jarek Potiuk
> > > Polidea <https://www.polidea.com/> | Principal Software Engineer
> > >
> > > M: +48 660 796 129 <+48660796129>
> > > [image: Polidea] <https://www.polidea.com/>
> > >
> >
> >
> > --
> > Darren L. Weber, Ph.D.
> > http://psdlw.users.sourceforge.net/
> > http://psdlw.users.sourceforge.net/wordpress/
> >
>


-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>

Reply via email to