@Fokko: If we go ahead with storing this info in Xcom, is your suggestion to use a fixed Prefix?
Cheers, Kaxil On Sat, Jan 11, 2020, 14:50 Driesprong, Fokko <[email protected]> wrote: > I would still be in favor of pushing this into xcom, however not > changing the behavior of the current xcom implementation. Xcom is now for > intra-communication, but it could also be inter-communication, for me it is > very closely related. So having an additional option to explicitly include > the state of the current operator. TaskReschedule, as Jarek > mentioned, would also be an option, but this is on a TaskRun level and I > believe it should be on a Task level. As mentioned earlier, Variables feels > very messy to me. This is on a global level, so then you should template > the dag_id, task_id in there? It would also create a lot of entries in the > table. > > Regarding the FTP issues. Airflow is not going to magically solve your FTP > connections. If the FTP server is down, then it is okay for the operator to > fail, and retry somewhere later and then the FTP server is hopefully back > up. If there are flakey network issues, then you should implement some > retrying mechanism. We had a similar use case when using HTTP. An Airflow > user was listing through a paged REST API. If fetching one the pages > failed, the operator would fail and it had to start all over again. Using > Tenacity this has been fixed: > > https://github.com/apache/airflow/blob/fd78c65cabae2241a4c1d3a792e00620049cbf3e/airflow/hooks/http_hook.py#L186 > > Ideally, you would like to have a path that contains the day in the FTP > path, so you know which files to copy for which day, and you can also > backfill this. This would exclude race conditions since you can do multiple > days in parallel. > > Using xcom this would also be possible. First have an operator that will > list the files on the FTP site, push this to xcom. Have another operator > that fetches the files that you already have and push this to xcom as well. > Using a Python operator you can easily do a diff, and then you know which > files to download. In this case, you should limit the active dag-runs to > one, to avoid race conditions. > > I believe that for many use cases you don't need to keep state in Airflow, > and this might be convenient, but it is just shifting the issue. If you can > fetch the issue from somewhere external, and this is the one and single > truth, then this should be the preferred solution. > > Cheers, Fokko > > Op za 11 jan. 2020 om 04:21 schreef Kaxil Naik <[email protected]>: > > > Hey all, > > > > Really good document Jacob. > > > > Below are my thoughts on different topics discussed in the docs and the > > mailing list: > > > > > > *Prefix on Xcom* > > I don't think that is a good idea to mix this into Xcom. We should let > Xcom > > be used for exactly one purpose. > > > > *Storing state in Xcom between Retries* > > This is definitely going to break idempotency. When the default retries > are > > enabled this is going to create undesired effects. > > > > @Daniel Standish : I would like to more understand the needs of Stateful > > sets for sure. If you can give us more scenarios where > > you think stateful set can solve the issue, please let us know. Also, why > > do you think Variables are not the correct solution for it? > > > > I would imagine your custom operator can store some state in Variables. > For > > example, you can store a json containing the following in > > Airflow Variables: > > > > - all_files > > - files_copied > > > > The variable, in this case, would have the details it needs to resume the > > copying from where it stopped. You custom operator as a first > > the step should check the Variable (with deserialized JSON). > > > > *The new structure for storing Stateful structure for reschedules* > > This can be a new table that has a relationship with the TI table or > just a > > new column and this column can be loaded only when using > > Reschedule/async operators or sensors. > > > > Regards, > > Kaxil > > > > On Fri, Jan 10, 2020 at 11:45 PM Yingbo Wang <[email protected]> wrote: > > > > > The updated AIP with smart sensor design and some implementation is in > > > > > > > > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-17+Airflow+sensor+optimization > > > Open source PR https://github.com/apache/airflow/pull/5499 > > > > > > > > > On Fri, Jan 10, 2020 at 1:44 PM Alex Guziel <[email protected] > > > .invalid> > > > wrote: > > > > > > > I feel like for this, we can incorporate the smart sensor we have > > > > implemented at Airbnb that we plan on open sourcing. > > > > > > > > The TL;DR is that it works by having the Sensor task run briefly and > > > > materialize some state into the DB which master sensor tasks poke > for. > > > This > > > > can be with custom time intervals. > > > > > > > > On Fri, Jan 10, 2020 at 1:42 PM Daniel Standish < > [email protected]> > > > > wrote: > > > > > > > > > I also am a big fan of adding better support for stateful tasks, > > > though I > > > > > know this is a thorny subject in airflow community. > > > > > > > > > > There are many data warehousing tasks where state makes a lot of > > sense. > > > > > While idempotence is a nice design pattern it's not the solution > for > > > > every > > > > > problem. > > > > > > > > > > XCom may not be the way, but there should be a way. Variables > work, > > > but > > > > to > > > > > me it makes sense to have a separate structure that is associated > > with > > > > the > > > > > task, or the dag, or the task instance. > > > > > > > > > > > > > > > On Fri, Jan 10, 2020 at 12:36 PM Shaw, Damian P. < > > > > > [email protected]> wrote: > > > > > > > > > > > FYI the design of the already discussed pull would allow state to > > be > > > > > > persisted across retries: > > > > > > > https://github.com/apache/airflow/pull/6370#issuecomment-546582724 > > > > While > > > > > > I agree in most cases you are correct I would however be greatly > > > > > > appreciated to not explicitly exclude this capability in the > design > > > of > > > > > > keeping state across reschedules. > > > > > > > > > > > > In lost of cases I already do exactly what you suggest, I scan > the > > > > state > > > > > > of the target system and resume from there. However in lengthy > > > > pipelines > > > > > > this becomes complex, for example I have a pipeline that goes > > > something > > > > > > like: FTP Download -> Decrypt File and Zip File -> Upload to > Jump > > > > Host > > > > > > and remove Zip File -> Store in S3 Bucket. > > > > > > > > > > > > The data needs to be available at the end state as soon as > possible > > > so > > > > > the > > > > > > decryption operator is a sensor that is already running and waits > > for > > > > the > > > > > > file to be available and immediately decrypts and zips the file, > > same > > > > for > > > > > > the upload operator. From inside the corporate network > environment > > > it's > > > > > not > > > > > > possible to check the state of the s3 bucket so the orriginal FTP > > > > > Download > > > > > > process can not check the state of the final final target system. > > > Even > > > > if > > > > > > it was this could lead to a race condition if the data is in > > transit. > > > > > > > > > > > > I guess in environments where you have a lot of control and > aren't > > > > > > beholden to capracious policy, audit, and regulatory requirements > > > such > > > > > > scenarios must indeed seem niche :). Anyway we have a soluton, > just > > > > > asking > > > > > > you don't go out of your way to stop users from shooting > themselves > > > in > > > > > the > > > > > > foot if they're really determined to. > > > > > > > > > > > > Damian > > > > > > > > > > > > -----Original Message----- > > > > > > From: Chris Palmer <[email protected]> > > > > > > Sent: Friday, January 10, 2020 13:37 > > > > > > To: [email protected] > > > > > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and > Poke > > > > > > Rescheduling in Operators > > > > > > > > > > > > I agree with Jarek that maintaining state between retries is not > > the > > > > > right > > > > > > thing to do. To be honest I'm not even convinced by the need for > > > state > > > > > > between reschedules myself. > > > > > > > > > > > > While I know from past experience that FTP is a pain to deal > with, > > I > > > > > think > > > > > > that your example is a pretty niche one. Additionally, when > > thinking > > > > > about > > > > > > idempotent task design, lots of tasks utilize state that exists > in > > > > other > > > > > > systems. You should be thinking about what state you want some > > > external > > > > > > system to be in after the task has run, rather than precisely > what > > > > > actions > > > > > > you want the task to do. > > > > > > > > > > > > It's the subtle difference between: > > > > > > > > > > > > "When it runs, this task should create the required table in my > > > > database" > > > > > > (by running a simple 'CREATE TABLE foobar .....') > > > > > > > > > > > > and > > > > > > > > > > > > "After this tasks has finished, the required table should exist > in > > my > > > > > > database" (by running 'CREATE TABLE IF NOT EXISTS foobar .....') > > > > > > > > > > > > > > > > > > The first will fail if run repeatedly (without someone taking > some > > > > other > > > > > > action like deleting the table). The second can be run as many > > times > > > as > > > > > you > > > > > > want without error, but it relies on the state that is maintained > > by > > > > your > > > > > > database. > > > > > > > > > > > > In your case the external state I think you should care about is > > the > > > > file > > > > > > system you are downloading the files to, as opposed to some > > external > > > > > table > > > > > > that could get out of sync with the file system. So I would write > > the > > > > > > operator so that the first thing it does is compare the complete > > list > > > > > with > > > > > > what already exists in the destination, and then only attempt to > > > > download > > > > > > the ones that are missing. > > > > > > > > > > > > Chris > > > > > > > > > > > > On Fri, Jan 10, 2020 at 12:52 PM Jarek Potiuk < > > > > [email protected]> > > > > > > wrote: > > > > > > > > > > > > > I wonder what others think of it. > > > > > > > > > > > > > > On Fri, Jan 10, 2020 at 6:04 PM Shaw, Damian P. < > > > > > > > [email protected]> wrote: > > > > > > > > > > > > > > > I don't believe so, the default should be that state isn't > > > > preserved > > > > > > > > across retries, just that it's possible for the user to > enable > > it > > > > if > > > > > > > > they are willing to take on that complexity. > > > > > > > > > > > > > > > > > > > > > > We have lots of operators that do this already as if they > fail > > > part > > > > > > > > way through a job the overhead of resuming from the beginning > > > > rather > > > > > > > > than having state on their progress is too much, just > annoying > > we > > > > > > > > have to keep this state outside Airflow as it requires extra > > > > > > > > infrastructure for our > > > > > > > task > > > > > > > > scheduling. > > > > > > > > > > > > > > > > For example we have an FTP site that we need to download 250 > > > files > > > > > > > > from, the full file list is provided to the operator, the FTP > > > > > > > > connection is > > > > > > > very > > > > > > > > unreliable and the job often fails midway, on retry we don't > > want > > > > to > > > > > > > resume > > > > > > > > from the beginning of the job so we store the state of our > > > progress > > > > > > > > in a table outside Airflow. We can't split the job in to 250 > > > tasks > > > > > > > > because the FTP site only accepts 1 connection at a time so > the > > > > > > > > overhead of 250 > > > > > > > logins > > > > > > > > would add an hour to the process and it would make the > Airflow > > UI > > > > > > > > near unusable. > > > > > > > > > > > > > > > > > > > > > > I do not know all the details of course - but your case seems > to > > be > > > > > > > solvable much easier and in "Airflow" way. You can have custom > > > > > > > operator that continues running until everything is downloaded > > and > > > > > > > retries failed transfer. The state of which file is downloaded > > > should > > > > > > > be kept in memory and even if FTP operation fails, it should > > retry > > > > > > > each failed file rather than fail the whole operator. That > would > > > > keep > > > > > > > it idempotent, and keep the state in memory rather than in > > > Airflow's > > > > > > > DB or in external system. Even if you already have an operator > > that > > > > > > > transfers X files already and you do not want to change it, you > > can > > > > > > > likely wrap it/extend to keep list of files in memory and retry > > > only > > > > > > > those files that failed so far. IMHO In your solution you do > > > exactly > > > > > > > what you are not supposed to according to Airflow's design - > > unless > > > > > > > you do some extra logic and complexity your operator is not > > > > idempotent. > > > > > > > > > > > > > > For example - If you delete downloaded files for whatever > reason > > > and > > > > > > > keep the external state and run backfill, I believe what will > > > happen > > > > > > > (unless you have some extra logic) it will see (from external > > > state) > > > > > > > that the files were already downloaded and will not download > them > > > > > > > again. If you use the in-memory state, it will work as > expected - > > > > next > > > > > > > time you run it via back-fill, it will re-download all files. > > > > > > > > > > > > > > J. > > > > > > > > > > > > > > > > > > > > > > Damian > > > > > > > > > > > > > > > > -----Original Message----- > > > > > > > > From: Jarek Potiuk <[email protected]> > > > > > > > > Sent: Friday, January 10, 2020 11:45 > > > > > > > > To: [email protected] > > > > > > > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and > > > Poke > > > > > > > > Rescheduling in Operators > > > > > > > > > > > > > > > > Just opening the Pandora's box :). > > > > > > > > > > > > > > > > I personally think we should not keep state between retries. > It > > > > > > > > opens up possibilities to whole host of problems and edge > > cases, > > > > and > > > > > > > > allows people to solve some approaches in non-airflow'y ways > - > > > > > > > > losing some important properties (mainly idempotency). Tasks > in > > > > > > > > Airflow should be idempotent > > > > > > > and > > > > > > > > stateless from the operator's author point of view). > > > > > > > > > > > > > > > > I think there is quite a big conceptual difference between > > > keeping > > > > > > > > the reschedule state (it's just optimising of execution of > the > > > same > > > > > > > > task) and keeping state between retries. > > > > > > > > > > > > > > > > Right now when you write your operator it's simple - no state > > to > > > > > > handle. > > > > > > > > XComs (and everything else) is cleared when task is re-run. > > > > > > > > With Poke reschedule proposal - the only thing you can do is > to > > > > > > > > save/retrieve a single ID attached to the current task > > instance. > > > > > > > > This id will not be cleared on reschedule, but it will be > > cleared > > > > on > > > > > > retry. > > > > > > > > > > > > > > > > If we introduce saving state on retries, it opens up a lot of > > > > > > > > questions - should we keep all retries? or just one? What > data > > > > > > > > should we keep - > > > > > > > should > > > > > > > > we allow more structured data? What guidelines should people > > > follow > > > > > > > > when writing their operators ? And it's a totally different > > > feature > > > > > > > > that > > > > > > > should > > > > > > > > be discussed separately. > > > > > > > > > > > > > > > > J. > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jan 10, 2020 at 5:15 PM Shaw, Damian P. < > > > > > > > > [email protected]> wrote: > > > > > > > > > > > > > > > > > I just wanted to add a related use case is task retries, > > there > > > > are > > > > > > > > > lots of scenarios where keeping state between the retries > as > > > well > > > > > > > > > as the reschedules would be extremely helpful, so as long > as > > > > > > > > > whatever the solution is isn't overly narrow I'd be > extremely > > > > > > appreciative. > > > > > > > > > > > > > > > > > > Damian > > > > > > > > > > > > > > > > > > -----Original Message----- > > > > > > > > > From: Jarek Potiuk <[email protected]> > > > > > > > > > Sent: Friday, January 10, 2020 11:05 > > > > > > > > > To: [email protected] > > > > > > > > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs > and > > > > Poke > > > > > > > > > Rescheduling in Operators > > > > > > > > > > > > > > > > > > Also another point to discuss here. As an original author > of > > > the > > > > > > > > > idea of using prefix in xcom, I think after the > discussions I > > > > > > > > > changed my mind. I think that simply adding a field to an > > > > existing > > > > > > > > > table > > > > > > > > > (TaskReschedule?) where we could keep all the data that > need > > to > > > > be > > > > > > > > > persisted, seems to be a good idea. We do not impact > > > performance > > > > > > > > > too much (the table is already > > > > > > > > > queried) , we do not add too much complexity and we do not > > try > > > to > > > > > > > > > introduce a generic "state" storage - this would be a > > solution > > > > > > > > > dedicated to only handle rescheduling. > > > > > > > > > > > > > > > > > > On Fri, Jan 10, 2020 at 1:44 PM Driesprong, Fokko > > > > > > > > > <[email protected]> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > The repoke logic as it is now implemented with the > sensor, > > is > > > > > > > > > > able to recover from an unexpected crash. After each > poke, > > it > > > > > > > > > > will just go to sleep. If the process crashes in between, > > it > > > > > > > > > > might become a zombie task in the end, but this is also > > taken > > > > > > > > > > care of by the scheduler. In this case, the scheduler > > thinks > > > > the > > > > > > > > > > task is still running, but in > > > > > > > > > reality, it crashes. > > > > > > > > > > There is a timeout that will reset the execution. > > Hopefully, > > > > > > > > > > this doesn't happen often, and should only occur when > > > something > > > > > > > > > > is off (for example a machine crashed, or a network > > > partition, > > > > > > > > > > etc). HTH > > > > > > > > > > > > > > > > > > > > Personally I don't like duplicating the same table for > > such a > > > > > > > > > > similar use case. But that's a design choice I guess. > > > > > > > > > > > > > > > > > > > > If we go for the async executor, the above might be > > > different. > > > > I > > > > > > > > > > think it is good to not include this in the discussion. > > > > > > > > > > > > > > > > > > > > Cheers, Fokko > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Op do 9 jan. 2020 om 19:33 schreef Darren Weber < > > > > > > > > > > [email protected] > > > > > > > > > > >: > > > > > > > > > > > > > > > > > > > > > Not sure whether to add to this email thread or the > > > > google-doc > > > > > > > > > > > (not sure > > > > > > > > > > if > > > > > > > > > > > that google-doc is just a meeting-notes or if it should > > > > evolve > > > > > > > > > > > into a > > > > > > > > > > spec > > > > > > > > > > > :grin:). > > > > > > > > > > > > > > > > > > > > > > Maybe a stupid suggestion, but here it is anyway: > > > > > > > > > > > > > > > > > > > > > > XCom - communication between elements of a DAG > > > > > > > > > > > > > > > > > > > > > > XState - key/value store available for each element of > a > > > DAG > > > > > > > > > > > > > > > > > > > > > > Clearly separate the behavior of a stateful resource > > > (XState) > > > > > > > > > > > from one > > > > > > > > > > that > > > > > > > > > > > is not intended to be stateful (XCom), if that makes > any > > > > sense? > > > > > > > > > > (Creating > > > > > > > > > > > a new XState feature is similar to a new db-table, I > > > guess.) > > > > > > > > > > > > > > > > > > > > > > Just to explain what I understand about the goals of > how > > > > > > > > > > > Airflow should behave when it has some ability for an > > > > operator > > > > > > > > > > > to reschedule pokes and > > > > > > > > > > the > > > > > > > > > > > scope of the changes. In the big picture, it's > important > > > > that > > > > > > > > > > > Airflow > > > > > > > > > > can > > > > > > > > > > > resurrect a DAG on a restart when some elements of the > > DAG > > > > > > > > > > > contain operators/sensors that are dependent on > external > > > > cloud > > > > > > > > > > > operations > > > > > > > > (e.g. > > > > > > > > > > AWS > > > > > > > > > > > Batch). This is feasible when Airflow can persist any > > > unique > > > > > > > > > > > job-ID defined by the external job provider (e.g. AWS > > Batch > > > > > > > > > > > "jobId") and any related identifiers for the job (e.g. > > AWS > > > > > > > > > > > Batch infrastructure ARNs for batch queue/compute-env > etc > > > and > > > > > > > > > > > all of this detail is captured in the AwsBatchOperator > > > > > already). > > > > > > > > > > > Assuming Airflow runs a DAG that spins up > > > > > > > > > > 100's > > > > > > > > > > > or 1000's of such external jobs and persists the > external > > > > > > > > > > > "jobId", when Airflow crashes or is stopped for > upgrades > > > etc. > > > > > > > > > > > and restarted, the operators that submitted the jobs > > should > > > > be > > > > > > > > > > > able to try to check on the state of those previously > > > > > > > > > > > submitted jobs. If the jobs are still running > > > > > > > > > > on > > > > > > > > > > > the external provider (e.g. AWS Batch), it should be > able > > > to > > > > > > > > > > > resume monitoring (poking) the job status without > > > > > > > > > > > re-submitting a duplicate job (also any failure to > poke a > > > job > > > > > > > > > > > should have some level of poke-retry behavior that does > > not > > > > > > > > > > > immediately fail the Airflow task that results in > somehow > > > > > > > > > > > re-submitting the same job that is already running). > So, > > > in > > > > > > > > > > > that context, what is the scope of the "reshedule-poke" > > > > > > > > > > > changes - do they > > > > > > > > > > simply > > > > > > > > > > > release a worker and so long as Airflow is "up" (has > not > > > > > > > > > > > crashed), the reschedule can resume poking, but if > > Airflow > > > > > > > > > > > crashes, the whole thing starts over again because the > > > state > > > > > > > > > > > of the task is not resilient to > > > > > > > > > > Airflow > > > > > > > > > > > crashing? Or, does the work on the "reschedule-poke" > > also > > > > > > > > > > > provide resilience when Airflow crashes? If the goal > is > > to > > > > be > > > > > > > > > > > resilient to > > > > > > > > > > Airflow > > > > > > > > > > > crashes, what is required for the "reschedule-poke" > work > > to > > > > > > > > > > > accomplish > > > > > > > > > > that > > > > > > > > > > > goal, if it doesn't already? (Would the architecture > for > > > > > > > > > > > Airflow resilience be out-of-scope in this context > > because > > > it > > > > > > > > > > > involves more complexity, like a Kafka cluster?) > > > > > > > > > > > > > > > > > > > > > > -- Darren > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 8, 2020 at 2:24 AM Jarek Potiuk > > > > > > > > > > > <[email protected]> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Commented as well. I think we are really going in a > > good > > > > > > > direction! > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 8, 2020 at 9:22 AM Driesprong, Fokko > > > > > > > > > > > > <[email protected] > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Jacob for building the document. I think > we're > > > on > > > > > > > > > > > > > the right > > > > > > > > > > > track. > > > > > > > > > > > > > I've added some comments and clarification to the > > > > > > > > > > > > > document, to > > > > > > > > > > validate > > > > > > > > > > > > > we're looking in the same direction. Would love to > > get > > > > > > > > > > > > > more people's opinion on this. > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, Fokko > > > > > > > > > > > > > > > > > > > > > > > > > > Op wo 8 jan. 2020 om 03:31 schreef Jacob Ferriero > > > > > > > > > > > > > <[email protected]>: > > > > > > > > > > > > > > > > > > > > > > > > > > > Image not working on dev list here is link to the > > > > github > > > > > > > > > > > > > > review > > > > > > > > > > > comment > > > > > > > > > > > > > > containing said image: > > > > > > > > > > > > > > > > > > https://github.com/apache/airflow/pull/6370#issuecomment > > > > > > > > > > > > > > -546 > > > > > > > > > > > > > > 58 > > > > > > > > > > > > > > 2724 > > > > > > > > > > . > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 7, 2020 at 5:40 PM Jacob Ferriero < > > > > > > > > > > [email protected]> > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > >> Hello Dev List, > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> The inspiration for this is to allow operators > to > > > > start > > > > > > > > > > > > > >> a long > > > > > > > > > > > running > > > > > > > > > > > > > >> task on an external system and reschedule pokes > > for > > > > > > > > > > > > > >> completion > > > > > > > > > > (e.g > > > > > > > > > > > > > spark > > > > > > > > > > > > > >> job on dataproc), instead of blocking a worker > > > > > > > > > > > > > >> (sketched out in > > > > > > > > > > > #6210 > > > > > > > > > > > > > >> <https://github.com/apache/airflow/pull/6210>) > to > > > > allow > > > > > > > > > > > > > >> freeing > > > > > > > > > > up > > > > > > > > > > > of > > > > > > > > > > > > > >> slots between pokes. To do this requires > > supporting > > > a > > > > > > > > > > > > > >> method for > > > > > > > > > > > > storing > > > > > > > > > > > > > >> task state between reschedules. > > > > > > > > > > > > > >> It's worth noting that a task would maintain > state > > > > only > > > > > > > > > > > > > >> during reschedules but clear state on retries. > In > > > this > > > > > > > > > > > > > >> way the task is > > > > > > > > > > > > > idempotent > > > > > > > > > > > > > >> before reaching a terminal state [SUCCES, FAIL, > > > > > > > UP_FOR_RETRY]. > > > > > > > > > > This > > > > > > > > > > > > > brings > > > > > > > > > > > > > >> up a question of the scope of commitment to > > > > idempotency > > > > > > > > > > > > > >> of > > > > > > > > > > > operators. > > > > > > > > > > > > > If it > > > > > > > > > > > > > >> is deemed acceptable for reschedules to maintain > > > some > > > > > > > > > > > > > >> state, then > > > > > > > > > > we > > > > > > > > > > > > can > > > > > > > > > > > > > >> free up workers between pokes. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Because this is very similar to the purpose of > > XCom > > > > > > > > > > > > > >> it's been > > > > > > > > > > > > postulated > > > > > > > > > > > > > >> that we should support this behavior in XCom > > rather > > > > > > > > > > > > > >> than provide a > > > > > > > > > > > new > > > > > > > > > > > > > >> model in the db for TaskState. (Though > discussion > > > here > > > > > > > > > > > > > >> on which is > > > > > > > > > > > > more > > > > > > > > > > > > > >> appropriate is more than welcome.) > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> I'd like to put forward a proposal to resurrect > > the > > > > > > > > > > > > > >> reverted > > > > > > > > > > > > > >> #6370 < > > https://github.com/apache/airflow/pull/6370> > > > > in > > > > > > > > > > > > > >> order to > > > > > > > > > > provide a > > > > > > > > > > > > > >> modification to the lifetime of XComs under > > certain > > > > > > > > conditions. > > > > > > > > > > The > > > > > > > > > > > > > diagram > > > > > > > > > > > > > >> below helps illustrate the change originally > > > proposed > > > > > > > > > > > > > >> in > > > > > > > > #6370. > > > > > > > > > > > There > > > > > > > > > > > > > was > > > > > > > > > > > > > >> concern about changing existing behavior > > > (potentially > > > > > > > > > > > > > >> breaking) > > > > > > > > > > and > > > > > > > > > > > > the > > > > > > > > > > > > > >> fact that this makes operators stateful. Per the > > > > review > > > > > > > > > > > > > >> comments > > > > > > > > > > and > > > > > > > > > > > > an > > > > > > > > > > > > > >> informal discussion (meetings notes < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1uuNCPAcwnn0smcDUJPDFMMjrK-z6 > > > > > > > > > > Z0os > > > > > > > > > > es > > > > > > > > > > PG7jVZ3oU/edit# > > > > > > > > > > > > > > > > > > > > > > > > > > > >> and #sig-async-operators) I'd like to modify the > > > > > > > > > > > > > >> approach > > > > > > > > > > > > > >> #6370 to > > > > > > > > > > > > only > > > > > > > > > > > > > >> skip clearing of XCom if the Xom key is prefixed > > > with > > > > > > > > > > > > > >> `airflow.models.xcom.DO_NOT_CLEAR_PREFIX = > > > > > > > > > > > > > >> "_STATEFUL_"` or > > > > > > > > > > similar. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> [image: image.png] > > > > > > > > > > > > > >> -- > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> *Jacob Ferriero* > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Strategic Cloud Engineer: Data Engineering > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> [email protected] > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> 617-714-2509 <(617)%20714-2509> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Jacob Ferriero* > > > > > > > > > > > > > > > > > > > > > > > > > > > > Strategic Cloud Engineer: Data Engineering > > > > > > > > > > > > > > > > > > > > > > > > > > > > [email protected] > > > > > > > > > > > > > > > > > > > > > > > > > > > > 617-714-2509 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > > > Jarek Potiuk > > > > > > > > > > > > Polidea <https://www.polidea.com/> | Principal > > Software > > > > > > > > > > > > Engineer > > > > > > > > > > > > > > > > > > > > > > > > M: +48 660 796 129 <+48660796129> > > > > > > > > > > > > [image: Polidea] <https://www.polidea.com/> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > Darren L. Weber, Ph.D. > > > > > > > > > > > http://psdlw.users.sourceforge.net/ > > > > > > > > > > > http://psdlw.users.sourceforge.net/wordpress/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > Jarek Potiuk > > > > > > > > > Polidea <https://www.polidea.com/> | Principal Software > > > Engineer > > > > > > > > > > > > > > > > > > M: +48 660 796 129 <+48660796129> > > > > > > > > > [image: Polidea] <https://www.polidea.com/> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ================================================================== > > > > > > > > > ==== > > > > > > > > > ========= > > > > > > > > > > > > > > > > > > Please access the attached hyperlink for an important > > > electronic > > > > > > > > > communications disclaimer: > > > > > > > > > > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html > > > > > > > > > > > > > ================================================================== > > > > > > > > > ==== > > > > > > > > > ========= > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > Jarek Potiuk > > > > > > > > Polidea <https://www.polidea.com/> | Principal Software > > Engineer > > > > > > > > > > > > > > > > M: +48 660 796 129 <+48660796129> > > > > > > > > [image: Polidea] <https://www.polidea.com/> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ====================================================================== > > > > > > > ========= > > > > > > > > > > > > > > > > Please access the attached hyperlink for an important > > electronic > > > > > > > > communications disclaimer: > > > > > > > > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html > > > > > > > > > > > > > > > > > > > > ====================================================================== > > > > > > > ========= > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Jarek Potiuk > > > > > > > Polidea <https://www.polidea.com/> | Principal Software > Engineer > > > > > > > > > > > > > > M: +48 660 796 129 <+48660796129> > > > > > > > [image: Polidea] <https://www.polidea.com/> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > =============================================================================== > > > > > > > > > > > > Please access the attached hyperlink for an important electronic > > > > > > communications disclaimer: > > > > > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html > > > > > > > > > > > > > > > > > > > > > =============================================================================== > > > > > > > > > > > > > > > > > > > > > > > > > > >
