The updated AIP with smart sensor design and some implementation is in https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-17+Airflow+sensor+optimization Open source PR https://github.com/apache/airflow/pull/5499
On Fri, Jan 10, 2020 at 1:44 PM Alex Guziel <alex.guz...@airbnb.com.invalid> wrote: > I feel like for this, we can incorporate the smart sensor we have > implemented at Airbnb that we plan on open sourcing. > > The TL;DR is that it works by having the Sensor task run briefly and > materialize some state into the DB which master sensor tasks poke for. This > can be with custom time intervals. > > On Fri, Jan 10, 2020 at 1:42 PM Daniel Standish <dpstand...@gmail.com> > wrote: > > > I also am a big fan of adding better support for stateful tasks, though I > > know this is a thorny subject in airflow community. > > > > There are many data warehousing tasks where state makes a lot of sense. > > While idempotence is a nice design pattern it's not the solution for > every > > problem. > > > > XCom may not be the way, but there should be a way. Variables work, but > to > > me it makes sense to have a separate structure that is associated with > the > > task, or the dag, or the task instance. > > > > > > On Fri, Jan 10, 2020 at 12:36 PM Shaw, Damian P. < > > damian.sha...@credit-suisse.com> wrote: > > > > > FYI the design of the already discussed pull would allow state to be > > > persisted across retries: > > > https://github.com/apache/airflow/pull/6370#issuecomment-546582724 > While > > > I agree in most cases you are correct I would however be greatly > > > appreciated to not explicitly exclude this capability in the design of > > > keeping state across reschedules. > > > > > > In lost of cases I already do exactly what you suggest, I scan the > state > > > of the target system and resume from there. However in lengthy > pipelines > > > this becomes complex, for example I have a pipeline that goes something > > > like: FTP Download -> Decrypt File and Zip File -> Upload to Jump > Host > > > and remove Zip File -> Store in S3 Bucket. > > > > > > The data needs to be available at the end state as soon as possible so > > the > > > decryption operator is a sensor that is already running and waits for > the > > > file to be available and immediately decrypts and zips the file, same > for > > > the upload operator. From inside the corporate network environment it's > > not > > > possible to check the state of the s3 bucket so the orriginal FTP > > Download > > > process can not check the state of the final final target system. Even > if > > > it was this could lead to a race condition if the data is in transit. > > > > > > I guess in environments where you have a lot of control and aren't > > > beholden to capracious policy, audit, and regulatory requirements such > > > scenarios must indeed seem niche :). Anyway we have a soluton, just > > asking > > > you don't go out of your way to stop users from shooting themselves in > > the > > > foot if they're really determined to. > > > > > > Damian > > > > > > -----Original Message----- > > > From: Chris Palmer <ch...@crpalmer.com> > > > Sent: Friday, January 10, 2020 13:37 > > > To: dev@airflow.apache.org > > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke > > > Rescheduling in Operators > > > > > > I agree with Jarek that maintaining state between retries is not the > > right > > > thing to do. To be honest I'm not even convinced by the need for state > > > between reschedules myself. > > > > > > While I know from past experience that FTP is a pain to deal with, I > > think > > > that your example is a pretty niche one. Additionally, when thinking > > about > > > idempotent task design, lots of tasks utilize state that exists in > other > > > systems. You should be thinking about what state you want some external > > > system to be in after the task has run, rather than precisely what > > actions > > > you want the task to do. > > > > > > It's the subtle difference between: > > > > > > "When it runs, this task should create the required table in my > database" > > > (by running a simple 'CREATE TABLE foobar .....') > > > > > > and > > > > > > "After this tasks has finished, the required table should exist in my > > > database" (by running 'CREATE TABLE IF NOT EXISTS foobar .....') > > > > > > > > > The first will fail if run repeatedly (without someone taking some > other > > > action like deleting the table). The second can be run as many times as > > you > > > want without error, but it relies on the state that is maintained by > your > > > database. > > > > > > In your case the external state I think you should care about is the > file > > > system you are downloading the files to, as opposed to some external > > table > > > that could get out of sync with the file system. So I would write the > > > operator so that the first thing it does is compare the complete list > > with > > > what already exists in the destination, and then only attempt to > download > > > the ones that are missing. > > > > > > Chris > > > > > > On Fri, Jan 10, 2020 at 12:52 PM Jarek Potiuk < > jarek.pot...@polidea.com> > > > wrote: > > > > > > > I wonder what others think of it. > > > > > > > > On Fri, Jan 10, 2020 at 6:04 PM Shaw, Damian P. < > > > > damian.sha...@credit-suisse.com> wrote: > > > > > > > > > I don't believe so, the default should be that state isn't > preserved > > > > > across retries, just that it's possible for the user to enable it > if > > > > > they are willing to take on that complexity. > > > > > > > > > > > > > We have lots of operators that do this already as if they fail part > > > > > way through a job the overhead of resuming from the beginning > rather > > > > > than having state on their progress is too much, just annoying we > > > > > have to keep this state outside Airflow as it requires extra > > > > > infrastructure for our > > > > task > > > > > scheduling. > > > > > > > > > > For example we have an FTP site that we need to download 250 files > > > > > from, the full file list is provided to the operator, the FTP > > > > > connection is > > > > very > > > > > unreliable and the job often fails midway, on retry we don't want > to > > > > resume > > > > > from the beginning of the job so we store the state of our progress > > > > > in a table outside Airflow. We can't split the job in to 250 tasks > > > > > because the FTP site only accepts 1 connection at a time so the > > > > > overhead of 250 > > > > logins > > > > > would add an hour to the process and it would make the Airflow UI > > > > > near unusable. > > > > > > > > > > > > > I do not know all the details of course - but your case seems to be > > > > solvable much easier and in "Airflow" way. You can have custom > > > > operator that continues running until everything is downloaded and > > > > retries failed transfer. The state of which file is downloaded should > > > > be kept in memory and even if FTP operation fails, it should retry > > > > each failed file rather than fail the whole operator. That would > keep > > > > it idempotent, and keep the state in memory rather than in Airflow's > > > > DB or in external system. Even if you already have an operator that > > > > transfers X files already and you do not want to change it, you can > > > > likely wrap it/extend to keep list of files in memory and retry only > > > > those files that failed so far. IMHO In your solution you do exactly > > > > what you are not supposed to according to Airflow's design - unless > > > > you do some extra logic and complexity your operator is not > idempotent. > > > > > > > > For example - If you delete downloaded files for whatever reason and > > > > keep the external state and run backfill, I believe what will happen > > > > (unless you have some extra logic) it will see (from external state) > > > > that the files were already downloaded and will not download them > > > > again. If you use the in-memory state, it will work as expected - > next > > > > time you run it via back-fill, it will re-download all files. > > > > > > > > J. > > > > > > > > > > > > > Damian > > > > > > > > > > -----Original Message----- > > > > > From: Jarek Potiuk <jarek.pot...@polidea.com> > > > > > Sent: Friday, January 10, 2020 11:45 > > > > > To: dev@airflow.apache.org > > > > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke > > > > > Rescheduling in Operators > > > > > > > > > > Just opening the Pandora's box :). > > > > > > > > > > I personally think we should not keep state between retries. It > > > > > opens up possibilities to whole host of problems and edge cases, > and > > > > > allows people to solve some approaches in non-airflow'y ways - > > > > > losing some important properties (mainly idempotency). Tasks in > > > > > Airflow should be idempotent > > > > and > > > > > stateless from the operator's author point of view). > > > > > > > > > > I think there is quite a big conceptual difference between keeping > > > > > the reschedule state (it's just optimising of execution of the same > > > > > task) and keeping state between retries. > > > > > > > > > > Right now when you write your operator it's simple - no state to > > > handle. > > > > > XComs (and everything else) is cleared when task is re-run. > > > > > With Poke reschedule proposal - the only thing you can do is to > > > > > save/retrieve a single ID attached to the current task instance. > > > > > This id will not be cleared on reschedule, but it will be cleared > on > > > retry. > > > > > > > > > > If we introduce saving state on retries, it opens up a lot of > > > > > questions - should we keep all retries? or just one? What data > > > > > should we keep - > > > > should > > > > > we allow more structured data? What guidelines should people follow > > > > > when writing their operators ? And it's a totally different feature > > > > > that > > > > should > > > > > be discussed separately. > > > > > > > > > > J. > > > > > > > > > > > > > > > On Fri, Jan 10, 2020 at 5:15 PM Shaw, Damian P. < > > > > > damian.sha...@credit-suisse.com> wrote: > > > > > > > > > > > I just wanted to add a related use case is task retries, there > are > > > > > > lots of scenarios where keeping state between the retries as well > > > > > > as the reschedules would be extremely helpful, so as long as > > > > > > whatever the solution is isn't overly narrow I'd be extremely > > > appreciative. > > > > > > > > > > > > Damian > > > > > > > > > > > > -----Original Message----- > > > > > > From: Jarek Potiuk <jarek.pot...@polidea.com> > > > > > > Sent: Friday, January 10, 2020 11:05 > > > > > > To: dev@airflow.apache.org > > > > > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and > Poke > > > > > > Rescheduling in Operators > > > > > > > > > > > > Also another point to discuss here. As an original author of the > > > > > > idea of using prefix in xcom, I think after the discussions I > > > > > > changed my mind. I think that simply adding a field to an > existing > > > > > > table > > > > > > (TaskReschedule?) where we could keep all the data that need to > be > > > > > > persisted, seems to be a good idea. We do not impact performance > > > > > > too much (the table is already > > > > > > queried) , we do not add too much complexity and we do not try to > > > > > > introduce a generic "state" storage - this would be a solution > > > > > > dedicated to only handle rescheduling. > > > > > > > > > > > > On Fri, Jan 10, 2020 at 1:44 PM Driesprong, Fokko > > > > > > <fo...@driesprong.frl> > > > > > > wrote: > > > > > > > > > > > > > The repoke logic as it is now implemented with the sensor, is > > > > > > > able to recover from an unexpected crash. After each poke, it > > > > > > > will just go to sleep. If the process crashes in between, it > > > > > > > might become a zombie task in the end, but this is also taken > > > > > > > care of by the scheduler. In this case, the scheduler thinks > the > > > > > > > task is still running, but in > > > > > > reality, it crashes. > > > > > > > There is a timeout that will reset the execution. Hopefully, > > > > > > > this doesn't happen often, and should only occur when something > > > > > > > is off (for example a machine crashed, or a network partition, > > > > > > > etc). HTH > > > > > > > > > > > > > > Personally I don't like duplicating the same table for such a > > > > > > > similar use case. But that's a design choice I guess. > > > > > > > > > > > > > > If we go for the async executor, the above might be different. > I > > > > > > > think it is good to not include this in the discussion. > > > > > > > > > > > > > > Cheers, Fokko > > > > > > > > > > > > > > > > > > > > > Op do 9 jan. 2020 om 19:33 schreef Darren Weber < > > > > > > > dweber.consult...@gmail.com > > > > > > > >: > > > > > > > > > > > > > > > Not sure whether to add to this email thread or the > google-doc > > > > > > > > (not sure > > > > > > > if > > > > > > > > that google-doc is just a meeting-notes or if it should > evolve > > > > > > > > into a > > > > > > > spec > > > > > > > > :grin:). > > > > > > > > > > > > > > > > Maybe a stupid suggestion, but here it is anyway: > > > > > > > > > > > > > > > > XCom - communication between elements of a DAG > > > > > > > > > > > > > > > > XState - key/value store available for each element of a DAG > > > > > > > > > > > > > > > > Clearly separate the behavior of a stateful resource (XState) > > > > > > > > from one > > > > > > > that > > > > > > > > is not intended to be stateful (XCom), if that makes any > sense? > > > > > > > (Creating > > > > > > > > a new XState feature is similar to a new db-table, I guess.) > > > > > > > > > > > > > > > > Just to explain what I understand about the goals of how > > > > > > > > Airflow should behave when it has some ability for an > operator > > > > > > > > to reschedule pokes and > > > > > > > the > > > > > > > > scope of the changes. In the big picture, it's important > that > > > > > > > > Airflow > > > > > > > can > > > > > > > > resurrect a DAG on a restart when some elements of the DAG > > > > > > > > contain operators/sensors that are dependent on external > cloud > > > > > > > > operations > > > > > (e.g. > > > > > > > AWS > > > > > > > > Batch). This is feasible when Airflow can persist any unique > > > > > > > > job-ID defined by the external job provider (e.g. AWS Batch > > > > > > > > "jobId") and any related identifiers for the job (e.g. AWS > > > > > > > > Batch infrastructure ARNs for batch queue/compute-env etc and > > > > > > > > all of this detail is captured in the AwsBatchOperator > > already). > > > > > > > > Assuming Airflow runs a DAG that spins up > > > > > > > 100's > > > > > > > > or 1000's of such external jobs and persists the external > > > > > > > > "jobId", when Airflow crashes or is stopped for upgrades etc. > > > > > > > > and restarted, the operators that submitted the jobs should > be > > > > > > > > able to try to check on the state of those previously > > > > > > > > submitted jobs. If the jobs are still running > > > > > > > on > > > > > > > > the external provider (e.g. AWS Batch), it should be able to > > > > > > > > resume monitoring (poking) the job status without > > > > > > > > re-submitting a duplicate job (also any failure to poke a job > > > > > > > > should have some level of poke-retry behavior that does not > > > > > > > > immediately fail the Airflow task that results in somehow > > > > > > > > re-submitting the same job that is already running). So, in > > > > > > > > that context, what is the scope of the "reshedule-poke" > > > > > > > > changes - do they > > > > > > > simply > > > > > > > > release a worker and so long as Airflow is "up" (has not > > > > > > > > crashed), the reschedule can resume poking, but if Airflow > > > > > > > > crashes, the whole thing starts over again because the state > > > > > > > > of the task is not resilient to > > > > > > > Airflow > > > > > > > > crashing? Or, does the work on the "reschedule-poke" also > > > > > > > > provide resilience when Airflow crashes? If the goal is to > be > > > > > > > > resilient to > > > > > > > Airflow > > > > > > > > crashes, what is required for the "reschedule-poke" work to > > > > > > > > accomplish > > > > > > > that > > > > > > > > goal, if it doesn't already? (Would the architecture for > > > > > > > > Airflow resilience be out-of-scope in this context because it > > > > > > > > involves more complexity, like a Kafka cluster?) > > > > > > > > > > > > > > > > -- Darren > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 8, 2020 at 2:24 AM Jarek Potiuk > > > > > > > > <jarek.pot...@polidea.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Commented as well. I think we are really going in a good > > > > direction! > > > > > > > > > > > > > > > > > > On Wed, Jan 8, 2020 at 9:22 AM Driesprong, Fokko > > > > > > > > > <fo...@driesprong.frl > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Thanks Jacob for building the document. I think we're on > > > > > > > > > > the right > > > > > > > > track. > > > > > > > > > > I've added some comments and clarification to the > > > > > > > > > > document, to > > > > > > > validate > > > > > > > > > > we're looking in the same direction. Would love to get > > > > > > > > > > more people's opinion on this. > > > > > > > > > > > > > > > > > > > > Cheers, Fokko > > > > > > > > > > > > > > > > > > > > Op wo 8 jan. 2020 om 03:31 schreef Jacob Ferriero > > > > > > > > > > <jferri...@google.com.invalid>: > > > > > > > > > > > > > > > > > > > > > Image not working on dev list here is link to the > github > > > > > > > > > > > review > > > > > > > > comment > > > > > > > > > > > containing said image: > > > > > > > > > > > > https://github.com/apache/airflow/pull/6370#issuecomment > > > > > > > > > > > -546 > > > > > > > > > > > 58 > > > > > > > > > > > 2724 > > > > > > > . > > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 7, 2020 at 5:40 PM Jacob Ferriero < > > > > > > > jferri...@google.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > >> Hello Dev List, > > > > > > > > > > >> > > > > > > > > > > >> The inspiration for this is to allow operators to > start > > > > > > > > > > >> a long > > > > > > > > running > > > > > > > > > > >> task on an external system and reschedule pokes for > > > > > > > > > > >> completion > > > > > > > (e.g > > > > > > > > > > spark > > > > > > > > > > >> job on dataproc), instead of blocking a worker > > > > > > > > > > >> (sketched out in > > > > > > > > #6210 > > > > > > > > > > >> <https://github.com/apache/airflow/pull/6210>) to > allow > > > > > > > > > > >> freeing > > > > > > > up > > > > > > > > of > > > > > > > > > > >> slots between pokes. To do this requires supporting a > > > > > > > > > > >> method for > > > > > > > > > storing > > > > > > > > > > >> task state between reschedules. > > > > > > > > > > >> It's worth noting that a task would maintain state > only > > > > > > > > > > >> during reschedules but clear state on retries. In this > > > > > > > > > > >> way the task is > > > > > > > > > > idempotent > > > > > > > > > > >> before reaching a terminal state [SUCCES, FAIL, > > > > UP_FOR_RETRY]. > > > > > > > This > > > > > > > > > > brings > > > > > > > > > > >> up a question of the scope of commitment to > idempotency > > > > > > > > > > >> of > > > > > > > > operators. > > > > > > > > > > If it > > > > > > > > > > >> is deemed acceptable for reschedules to maintain some > > > > > > > > > > >> state, then > > > > > > > we > > > > > > > > > can > > > > > > > > > > >> free up workers between pokes. > > > > > > > > > > >> > > > > > > > > > > >> Because this is very similar to the purpose of XCom > > > > > > > > > > >> it's been > > > > > > > > > postulated > > > > > > > > > > >> that we should support this behavior in XCom rather > > > > > > > > > > >> than provide a > > > > > > > > new > > > > > > > > > > >> model in the db for TaskState. (Though discussion here > > > > > > > > > > >> on which is > > > > > > > > > more > > > > > > > > > > >> appropriate is more than welcome.) > > > > > > > > > > >> > > > > > > > > > > >> I'd like to put forward a proposal to resurrect the > > > > > > > > > > >> reverted > > > > > > > > > > >> #6370 <https://github.com/apache/airflow/pull/6370> > in > > > > > > > > > > >> order to > > > > > > > provide a > > > > > > > > > > >> modification to the lifetime of XComs under certain > > > > > conditions. > > > > > > > The > > > > > > > > > > diagram > > > > > > > > > > >> below helps illustrate the change originally proposed > > > > > > > > > > >> in > > > > > #6370. > > > > > > > > There > > > > > > > > > > was > > > > > > > > > > >> concern about changing existing behavior (potentially > > > > > > > > > > >> breaking) > > > > > > > and > > > > > > > > > the > > > > > > > > > > >> fact that this makes operators stateful. Per the > review > > > > > > > > > > >> comments > > > > > > > and > > > > > > > > > an > > > > > > > > > > >> informal discussion (meetings notes < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1uuNCPAcwnn0smcDUJPDFMMjrK-z6 > > > > > > > Z0os > > > > > > > es > > > > > > > PG7jVZ3oU/edit# > > > > > > > > > > > > > > > > > > > > > >> and #sig-async-operators) I'd like to modify the > > > > > > > > > > >> approach > > > > > > > > > > >> #6370 to > > > > > > > > > only > > > > > > > > > > >> skip clearing of XCom if the Xom key is prefixed with > > > > > > > > > > >> `airflow.models.xcom.DO_NOT_CLEAR_PREFIX = > > > > > > > > > > >> "_STATEFUL_"` or > > > > > > > similar. > > > > > > > > > > >> > > > > > > > > > > >> [image: image.png] > > > > > > > > > > >> -- > > > > > > > > > > >> > > > > > > > > > > >> *Jacob Ferriero* > > > > > > > > > > >> > > > > > > > > > > >> Strategic Cloud Engineer: Data Engineering > > > > > > > > > > >> > > > > > > > > > > >> jferri...@google.com > > > > > > > > > > >> > > > > > > > > > > >> 617-714-2509 <(617)%20714-2509> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > *Jacob Ferriero* > > > > > > > > > > > > > > > > > > > > > > Strategic Cloud Engineer: Data Engineering > > > > > > > > > > > > > > > > > > > > > > jferri...@google.com > > > > > > > > > > > > > > > > > > > > > > 617-714-2509 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > Jarek Potiuk > > > > > > > > > Polidea <https://www.polidea.com/> | Principal Software > > > > > > > > > Engineer > > > > > > > > > > > > > > > > > > M: +48 660 796 129 <+48660796129> > > > > > > > > > [image: Polidea] <https://www.polidea.com/> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Darren L. Weber, Ph.D. > > > > > > > > http://psdlw.users.sourceforge.net/ > > > > > > > > http://psdlw.users.sourceforge.net/wordpress/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > Jarek Potiuk > > > > > > Polidea <https://www.polidea.com/> | Principal Software Engineer > > > > > > > > > > > > M: +48 660 796 129 <+48660796129> > > > > > > [image: Polidea] <https://www.polidea.com/> > > > > > > > > > > > > > > > > > > > > > > > > > ================================================================== > > > > > > ==== > > > > > > ========= > > > > > > > > > > > > Please access the attached hyperlink for an important electronic > > > > > > communications disclaimer: > > > > > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html > > > > > > > ================================================================== > > > > > > ==== > > > > > > ========= > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > Jarek Potiuk > > > > > Polidea <https://www.polidea.com/> | Principal Software Engineer > > > > > > > > > > M: +48 660 796 129 <+48660796129> > > > > > [image: Polidea] <https://www.polidea.com/> > > > > > > > > > > > > > > > > > > > > > > > > > ====================================================================== > > > > ========= > > > > > > > > > > Please access the attached hyperlink for an important electronic > > > > > communications disclaimer: > > > > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html > > > > > > > > > > ====================================================================== > > > > ========= > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Jarek Potiuk > > > > Polidea <https://www.polidea.com/> | Principal Software Engineer > > > > > > > > M: +48 660 796 129 <+48660796129> > > > > [image: Polidea] <https://www.polidea.com/> > > > > > > > > > > > > > > > > > > > =============================================================================== > > > > > > Please access the attached hyperlink for an important electronic > > > communications disclaimer: > > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html > > > > > > =============================================================================== > > > > > > > > >