Also another point to discuss here. As an original author of the idea of using prefix in xcom, I think after the discussions I changed my mind. I think that simply adding a field to an existing table (TaskReschedule?) where we could keep all the data that need to be persisted, seems to be a good idea. We do not impact performance too much (the table is already queried) , we do not add too much complexity and we do not try to introduce a generic "state" storage - this would be a solution dedicated to only handle rescheduling.
On Fri, Jan 10, 2020 at 1:44 PM Driesprong, Fokko <fo...@driesprong.frl> wrote: > The repoke logic as it is now implemented with the sensor, is able to > recover from an unexpected crash. After each poke, it will just go to > sleep. If the process crashes in between, it might become a zombie task in > the end, but this is also taken care of by the scheduler. In this case, the > scheduler thinks the task is still running, but in reality, it crashes. > There is a timeout that will reset the execution. Hopefully, this doesn't > happen often, and should only occur when something is off (for example a > machine crashed, or a network partition, etc). HTH > > Personally I don't like duplicating the same table for such a similar use > case. But that's a design choice I guess. > > If we go for the async executor, the above might be different. I think it > is good to not include this in the discussion. > > Cheers, Fokko > > > Op do 9 jan. 2020 om 19:33 schreef Darren Weber < > dweber.consult...@gmail.com > >: > > > Not sure whether to add to this email thread or the google-doc (not sure > if > > that google-doc is just a meeting-notes or if it should evolve into a > spec > > :grin:). > > > > Maybe a stupid suggestion, but here it is anyway: > > > > XCom - communication between elements of a DAG > > > > XState - key/value store available for each element of a DAG > > > > Clearly separate the behavior of a stateful resource (XState) from one > that > > is not intended to be stateful (XCom), if that makes any sense? > (Creating > > a new XState feature is similar to a new db-table, I guess.) > > > > Just to explain what I understand about the goals of how Airflow should > > behave when it has some ability for an operator to reschedule pokes and > the > > scope of the changes. In the big picture, it's important that Airflow > can > > resurrect a DAG on a restart when some elements of the DAG contain > > operators/sensors that are dependent on external cloud operations (e.g. > AWS > > Batch). This is feasible when Airflow can persist any unique job-ID > > defined by the external job provider (e.g. AWS Batch "jobId") and any > > related identifiers for the job (e.g. AWS Batch infrastructure ARNs for > > batch queue/compute-env etc and all of this detail is captured in the > > AwsBatchOperator already). Assuming Airflow runs a DAG that spins up > 100's > > or 1000's of such external jobs and persists the external "jobId", when > > Airflow crashes or is stopped for upgrades etc. and restarted, the > > operators that submitted the jobs should be able to try to check on the > > state of those previously submitted jobs. If the jobs are still running > on > > the external provider (e.g. AWS Batch), it should be able to resume > > monitoring (poking) the job status without re-submitting a duplicate job > > (also any failure to poke a job should have some level of poke-retry > > behavior that does not immediately fail the Airflow task that results in > > somehow re-submitting the same job that is already running). So, in that > > context, what is the scope of the "reshedule-poke" changes - do they > simply > > release a worker and so long as Airflow is "up" (has not crashed), the > > reschedule can resume poking, but if Airflow crashes, the whole thing > > starts over again because the state of the task is not resilient to > Airflow > > crashing? Or, does the work on the "reschedule-poke" also provide > > resilience when Airflow crashes? If the goal is to be resilient to > Airflow > > crashes, what is required for the "reschedule-poke" work to accomplish > that > > goal, if it doesn't already? (Would the architecture for Airflow > > resilience be out-of-scope in this context because it involves more > > complexity, like a Kafka cluster?) > > > > -- Darren > > > > > > > > On Wed, Jan 8, 2020 at 2:24 AM Jarek Potiuk <jarek.pot...@polidea.com> > > wrote: > > > > > Commented as well. I think we are really going in a good direction! > > > > > > On Wed, Jan 8, 2020 at 9:22 AM Driesprong, Fokko <fo...@driesprong.frl > > > > > wrote: > > > > > > > Thanks Jacob for building the document. I think we're on the right > > track. > > > > I've added some comments and clarification to the document, to > validate > > > > we're looking in the same direction. Would love to get more people's > > > > opinion on this. > > > > > > > > Cheers, Fokko > > > > > > > > Op wo 8 jan. 2020 om 03:31 schreef Jacob Ferriero > > > > <jferri...@google.com.invalid>: > > > > > > > > > Image not working on dev list here is link to the github review > > comment > > > > > containing said image: > > > > > https://github.com/apache/airflow/pull/6370#issuecomment-546582724 > . > > > > > > > > > > On Tue, Jan 7, 2020 at 5:40 PM Jacob Ferriero < > jferri...@google.com> > > > > > wrote: > > > > > > > > > >> Hello Dev List, > > > > >> > > > > >> The inspiration for this is to allow operators to start a long > > running > > > > >> task on an external system and reschedule pokes for completion > (e.g > > > > spark > > > > >> job on dataproc), instead of blocking a worker (sketched out in > > #6210 > > > > >> <https://github.com/apache/airflow/pull/6210>) to allow freeing > up > > of > > > > >> slots between pokes. To do this requires supporting a method for > > > storing > > > > >> task state between reschedules. > > > > >> It's worth noting that a task would maintain state only during > > > > >> reschedules but clear state on retries. In this way the task is > > > > idempotent > > > > >> before reaching a terminal state [SUCCES, FAIL, UP_FOR_RETRY]. > This > > > > brings > > > > >> up a question of the scope of commitment to idempotency of > > operators. > > > > If it > > > > >> is deemed acceptable for reschedules to maintain some state, then > we > > > can > > > > >> free up workers between pokes. > > > > >> > > > > >> Because this is very similar to the purpose of XCom it's been > > > postulated > > > > >> that we should support this behavior in XCom rather than provide a > > new > > > > >> model in the db for TaskState. (Though discussion here on which is > > > more > > > > >> appropriate is more than welcome.) > > > > >> > > > > >> I'd like to put forward a proposal to resurrect the reverted #6370 > > > > >> <https://github.com/apache/airflow/pull/6370> in order to > provide a > > > > >> modification to the lifetime of XComs under certain conditions. > The > > > > diagram > > > > >> below helps illustrate the change originally proposed in #6370. > > There > > > > was > > > > >> concern about changing existing behavior (potentially breaking) > and > > > the > > > > >> fact that this makes operators stateful. Per the review comments > and > > > an > > > > >> informal discussion (meetings notes > > > > >> < > > > > > > > > > > https://docs.google.com/document/d/1uuNCPAcwnn0smcDUJPDFMMjrK-z6Z0osesPG7jVZ3oU/edit# > > > > > > > > > >> and #sig-async-operators) I'd like to modify the approach #6370 to > > > only > > > > >> skip clearing of XCom if the Xom key is prefixed with > > > > >> `airflow.models.xcom.DO_NOT_CLEAR_PREFIX = "_STATEFUL_"` or > similar. > > > > >> > > > > >> [image: image.png] > > > > >> -- > > > > >> > > > > >> *Jacob Ferriero* > > > > >> > > > > >> Strategic Cloud Engineer: Data Engineering > > > > >> > > > > >> jferri...@google.com > > > > >> > > > > >> 617-714-2509 <(617)%20714-2509> > > > > >> > > > > > > > > > > > > > > > -- > > > > > > > > > > *Jacob Ferriero* > > > > > > > > > > Strategic Cloud Engineer: Data Engineering > > > > > > > > > > jferri...@google.com > > > > > > > > > > 617-714-2509 > > > > > > > > > > > > > > > > > > -- > > > > > > Jarek Potiuk > > > Polidea <https://www.polidea.com/> | Principal Software Engineer > > > > > > M: +48 660 796 129 <+48660796129> > > > [image: Polidea] <https://www.polidea.com/> > > > > > > > > > -- > > Darren L. Weber, Ph.D. > > http://psdlw.users.sourceforge.net/ > > http://psdlw.users.sourceforge.net/wordpress/ > > > -- Jarek Potiuk Polidea <https://www.polidea.com/> | Principal Software Engineer M: +48 660 796 129 <+48660796129> [image: Polidea] <https://www.polidea.com/>