Not sure whether to add to this email thread or the google-doc (not sure if that google-doc is just a meeting-notes or if it should evolve into a spec :grin:).
Maybe a stupid suggestion, but here it is anyway: XCom - communication between elements of a DAG XState - key/value store available for each element of a DAG Clearly separate the behavior of a stateful resource (XState) from one that is not intended to be stateful (XCom), if that makes any sense? (Creating a new XState feature is similar to a new db-table, I guess.) Just to explain what I understand about the goals of how Airflow should behave when it has some ability for an operator to reschedule pokes and the scope of the changes. In the big picture, it's important that Airflow can resurrect a DAG on a restart when some elements of the DAG contain operators/sensors that are dependent on external cloud operations (e.g. AWS Batch). This is feasible when Airflow can persist any unique job-ID defined by the external job provider (e.g. AWS Batch "jobId") and any related identifiers for the job (e.g. AWS Batch infrastructure ARNs for batch queue/compute-env etc and all of this detail is captured in the AwsBatchOperator already). Assuming Airflow runs a DAG that spins up 100's or 1000's of such external jobs and persists the external "jobId", when Airflow crashes or is stopped for upgrades etc. and restarted, the operators that submitted the jobs should be able to try to check on the state of those previously submitted jobs. If the jobs are still running on the external provider (e.g. AWS Batch), it should be able to resume monitoring (poking) the job status without re-submitting a duplicate job (also any failure to poke a job should have some level of poke-retry behavior that does not immediately fail the Airflow task that results in somehow re-submitting the same job that is already running). So, in that context, what is the scope of the "reshedule-poke" changes - do they simply release a worker and so long as Airflow is "up" (has not crashed), the reschedule can resume poking, but if Airflow crashes, the whole thing starts over again because the state of the task is not resilient to Airflow crashing? Or, does the work on the "reschedule-poke" also provide resilience when Airflow crashes? If the goal is to be resilient to Airflow crashes, what is required for the "reschedule-poke" work to accomplish that goal, if it doesn't already? (Would the architecture for Airflow resilience be out-of-scope in this context because it involves more complexity, like a Kafka cluster?) -- Darren On Wed, Jan 8, 2020 at 2:24 AM Jarek Potiuk <[email protected]> wrote: > Commented as well. I think we are really going in a good direction! > > On Wed, Jan 8, 2020 at 9:22 AM Driesprong, Fokko <[email protected]> > wrote: > > > Thanks Jacob for building the document. I think we're on the right track. > > I've added some comments and clarification to the document, to validate > > we're looking in the same direction. Would love to get more people's > > opinion on this. > > > > Cheers, Fokko > > > > Op wo 8 jan. 2020 om 03:31 schreef Jacob Ferriero > > <[email protected]>: > > > > > Image not working on dev list here is link to the github review comment > > > containing said image: > > > https://github.com/apache/airflow/pull/6370#issuecomment-546582724. > > > > > > On Tue, Jan 7, 2020 at 5:40 PM Jacob Ferriero <[email protected]> > > > wrote: > > > > > >> Hello Dev List, > > >> > > >> The inspiration for this is to allow operators to start a long running > > >> task on an external system and reschedule pokes for completion (e.g > > spark > > >> job on dataproc), instead of blocking a worker (sketched out in #6210 > > >> <https://github.com/apache/airflow/pull/6210>) to allow freeing up of > > >> slots between pokes. To do this requires supporting a method for > storing > > >> task state between reschedules. > > >> It's worth noting that a task would maintain state only during > > >> reschedules but clear state on retries. In this way the task is > > idempotent > > >> before reaching a terminal state [SUCCES, FAIL, UP_FOR_RETRY]. This > > brings > > >> up a question of the scope of commitment to idempotency of operators. > > If it > > >> is deemed acceptable for reschedules to maintain some state, then we > can > > >> free up workers between pokes. > > >> > > >> Because this is very similar to the purpose of XCom it's been > postulated > > >> that we should support this behavior in XCom rather than provide a new > > >> model in the db for TaskState. (Though discussion here on which is > more > > >> appropriate is more than welcome.) > > >> > > >> I'd like to put forward a proposal to resurrect the reverted #6370 > > >> <https://github.com/apache/airflow/pull/6370> in order to provide a > > >> modification to the lifetime of XComs under certain conditions. The > > diagram > > >> below helps illustrate the change originally proposed in #6370. There > > was > > >> concern about changing existing behavior (potentially breaking) and > the > > >> fact that this makes operators stateful. Per the review comments and > an > > >> informal discussion (meetings notes > > >> < > > > https://docs.google.com/document/d/1uuNCPAcwnn0smcDUJPDFMMjrK-z6Z0osesPG7jVZ3oU/edit# > > > > > >> and #sig-async-operators) I'd like to modify the approach #6370 to > only > > >> skip clearing of XCom if the Xom key is prefixed with > > >> `airflow.models.xcom.DO_NOT_CLEAR_PREFIX = "_STATEFUL_"` or similar. > > >> > > >> [image: image.png] > > >> -- > > >> > > >> *Jacob Ferriero* > > >> > > >> Strategic Cloud Engineer: Data Engineering > > >> > > >> [email protected] > > >> > > >> 617-714-2509 <(617)%20714-2509> > > >> > > > > > > > > > -- > > > > > > *Jacob Ferriero* > > > > > > Strategic Cloud Engineer: Data Engineering > > > > > > [email protected] > > > > > > 617-714-2509 > > > > > > > > -- > > Jarek Potiuk > Polidea <https://www.polidea.com/> | Principal Software Engineer > > M: +48 660 796 129 <+48660796129> > [image: Polidea] <https://www.polidea.com/> > -- Darren L. Weber, Ph.D. http://psdlw.users.sourceforge.net/ http://psdlw.users.sourceforge.net/wordpress/
