FYI the design of the already discussed pull would allow state to be persisted 
across retries: 
https://github.com/apache/airflow/pull/6370#issuecomment-546582724 While I 
agree in most cases you are correct I would however be greatly appreciated to 
not explicitly exclude this capability in the design of keeping state across 
reschedules. 

In lost of cases I already do exactly what you suggest, I scan the state of the 
target system and resume from there. However in lengthy pipelines this becomes 
complex, for example I have a pipeline that goes something like:   FTP Download 
-> Decrypt File and Zip File -> Upload to Jump Host and remove Zip File -> 
Store in S3 Bucket.

The data needs to be available at the end state as soon as possible so the 
decryption operator is a sensor that is already running and waits for the file 
to be available and immediately decrypts and zips the file, same for the upload 
operator. From inside the corporate network environment it's not possible to 
check the state of the s3 bucket so the orriginal FTP Download process can not 
check the state of the final final target system. Even if it was this could 
lead to a race condition if the data is in transit.

I guess in environments where you have a lot of control and aren't beholden to 
capracious policy, audit, and regulatory requirements such scenarios must 
indeed seem niche :). Anyway we have a soluton, just asking you don't go out of 
your way to stop users from shooting themselves in the foot if they're really 
determined to.

Damian 

-----Original Message-----
From: Chris Palmer <ch...@crpalmer.com> 
Sent: Friday, January 10, 2020 13:37
To: dev@airflow.apache.org
Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling 
in Operators

I agree with Jarek that maintaining state between retries is not the right 
thing to do. To be honest I'm not even convinced by the need for state between 
reschedules myself.

While I know from past experience that FTP is a pain to deal with, I think that 
your example is a pretty niche one. Additionally, when thinking about 
idempotent task design, lots of tasks utilize state that exists in other 
systems. You should be thinking about what state you want some external system 
to be in after the task has run, rather than precisely what actions you want 
the task to do.

It's the subtle difference between:

"When it runs, this task should create the required table in my database"
(by running a simple 'CREATE TABLE foobar .....')

and

"After this tasks has finished, the required table should exist in my database" 
(by running 'CREATE TABLE IF NOT EXISTS foobar .....')


The first will fail if run repeatedly (without someone taking some other action 
like deleting the table). The second can be run as many times as you want 
without error, but it relies on the state that is maintained by your database.

In your case the external state I think you should care about is the file 
system you are downloading the files to, as opposed to some external table that 
could get out of sync with the file system. So I would write the operator so 
that the first thing it does is compare the complete list with what already 
exists in the destination, and then only attempt to download the ones that are 
missing.

Chris

On Fri, Jan 10, 2020 at 12:52 PM Jarek Potiuk <jarek.pot...@polidea.com>
wrote:

> I wonder what others think of it.
>
> On Fri, Jan 10, 2020 at 6:04 PM Shaw, Damian P. < 
> damian.sha...@credit-suisse.com> wrote:
>
> > I don't believe so, the default should be that state isn't preserved 
> > across retries, just that it's possible for the user to enable it if 
> > they are willing to take on that complexity.
>
>
> > We have lots of operators that do this already as if they fail part 
> > way through a job the overhead of resuming from the beginning rather 
> > than having state on their progress is too much, just annoying we 
> > have to keep this state outside Airflow as it requires extra 
> > infrastructure for our
> task
> > scheduling.
> >
> > For example we have an FTP site that we need to download 250 files 
> > from, the full file list is provided to the operator, the FTP 
> > connection is
> very
> > unreliable and the job often fails midway, on retry we don't want to
> resume
> > from the beginning of the job so we store the state of our progress 
> > in a table outside Airflow. We can't split the job in to 250 tasks 
> > because the FTP site only accepts 1 connection at a time so the 
> > overhead of 250
> logins
> > would add an hour to the process and it would make the Airflow UI 
> > near unusable.
> >
>
> I do not know all the details of course - but your case seems to be 
> solvable much easier and in "Airflow" way. You can have custom 
> operator that continues running until everything is downloaded and 
> retries failed transfer. The state of which file is downloaded should 
> be kept in memory and even if FTP operation fails, it should retry 
> each failed file rather than fail the whole operator.  That would keep 
> it idempotent, and keep the state in memory rather than in Airflow's 
> DB or in external system. Even if you already have an operator that 
> transfers X files already and you do not want to change it, you can 
> likely wrap it/extend to keep list of files in memory and retry only 
> those files that failed so far. IMHO In your solution you do exactly 
> what you are not supposed to according to Airflow's design - unless 
> you do some extra logic and complexity your operator is not idempotent.
>
> For example - If you delete downloaded files for whatever reason and 
> keep the external state and run backfill, I believe what will happen 
> (unless you have some extra logic) it will see (from external state) 
> that the files were already downloaded and will not download them 
> again. If you use the in-memory state, it will work as expected - next 
> time you run it via back-fill,  it will re-download all files.
>
> J.
>
>
> > Damian
> >
> > -----Original Message-----
> > From: Jarek Potiuk <jarek.pot...@polidea.com>
> > Sent: Friday, January 10, 2020 11:45
> > To: dev@airflow.apache.org
> > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke 
> > Rescheduling in Operators
> >
> > Just opening the Pandora's box :).
> >
> > I personally think we should not keep state between retries. It 
> > opens up possibilities to whole host of problems and edge cases, and 
> > allows people to solve some approaches in non-airflow'y ways - 
> > losing some important properties (mainly idempotency). Tasks in 
> > Airflow should be idempotent
> and
> > stateless from the operator's author point of view).
> >
> > I think there is quite a big conceptual difference between keeping 
> > the reschedule state (it's just optimising of execution of the same 
> > task) and keeping state between retries.
> >
> > Right now when you write your operator it's simple - no state to handle.
> > XComs (and everything else) is cleared when task is re-run.
> > With Poke reschedule proposal - the only thing you can do is to 
> > save/retrieve a single ID attached to the current task instance. 
> > This id will not be cleared on reschedule, but it will be cleared on retry.
> >
> > If we introduce saving state on retries, it opens up a lot of 
> > questions - should we keep all retries? or just one? What data 
> > should we keep -
> should
> > we allow more structured data? What guidelines should people follow 
> > when writing their operators ? And it's a totally different feature 
> > that
> should
> > be discussed separately.
> >
> > J.
> >
> >
> > On Fri, Jan 10, 2020 at 5:15 PM Shaw, Damian P. < 
> > damian.sha...@credit-suisse.com> wrote:
> >
> > > I just wanted to add a related use case is task retries, there are 
> > > lots of scenarios where keeping state between the retries as well 
> > > as the reschedules would be extremely helpful, so as long as 
> > > whatever the solution is isn't overly narrow I'd be extremely 
> > > appreciative.
> > >
> > > Damian
> > >
> > > -----Original Message-----
> > > From: Jarek Potiuk <jarek.pot...@polidea.com>
> > > Sent: Friday, January 10, 2020 11:05
> > > To: dev@airflow.apache.org
> > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke 
> > > Rescheduling in Operators
> > >
> > > Also another point to discuss here. As an original author of the 
> > > idea of using prefix in xcom, I think after the discussions I 
> > > changed my mind. I think that simply adding a field to an existing 
> > > table
> > > (TaskReschedule?) where we could keep all the data that need to be 
> > > persisted, seems to be a good idea. We do not impact performance 
> > > too much (the table is already
> > > queried) , we do not add too much complexity and we do not try to 
> > > introduce a generic "state" storage - this would be a solution 
> > > dedicated to only handle rescheduling.
> > >
> > > On Fri, Jan 10, 2020 at 1:44 PM Driesprong, Fokko 
> > > <fo...@driesprong.frl>
> > > wrote:
> > >
> > > > The repoke logic as it is now implemented with the sensor, is 
> > > > able to recover from an unexpected crash. After each poke, it 
> > > > will just go to sleep. If the process crashes in between, it 
> > > > might become a zombie task in the end, but this is also taken 
> > > > care of by the scheduler. In this case, the scheduler thinks the 
> > > > task is still running, but in
> > > reality, it crashes.
> > > > There is a timeout that will reset the execution. Hopefully, 
> > > > this doesn't happen often, and should only occur when something 
> > > > is off (for example a machine crashed, or a network partition, 
> > > > etc). HTH
> > > >
> > > > Personally I don't like duplicating the same table for such a 
> > > > similar use case. But that's a design choice I guess.
> > > >
> > > > If we go for the async executor, the above might be different. I 
> > > > think it is good to not include this in the discussion.
> > > >
> > > > Cheers, Fokko
> > > >
> > > >
> > > > Op do 9 jan. 2020 om 19:33 schreef Darren Weber < 
> > > > dweber.consult...@gmail.com
> > > > >:
> > > >
> > > > > Not sure whether to add to this email thread or the google-doc 
> > > > > (not sure
> > > > if
> > > > > that google-doc is just a meeting-notes or if it should evolve 
> > > > > into a
> > > > spec
> > > > > :grin:).
> > > > >
> > > > > Maybe a stupid suggestion, but here it is anyway:
> > > > >
> > > > > XCom - communication between elements of a DAG
> > > > >
> > > > > XState - key/value store available for each element of a DAG
> > > > >
> > > > > Clearly separate the behavior of a stateful resource (XState) 
> > > > > from one
> > > > that
> > > > > is not intended to be stateful (XCom), if that makes any sense?
> > > > (Creating
> > > > > a new XState feature is similar to a new db-table, I guess.)
> > > > >
> > > > > Just to explain what I understand about the goals of how 
> > > > > Airflow should behave when it has some ability for an operator 
> > > > > to reschedule pokes and
> > > > the
> > > > > scope of the changes.  In the big picture, it's important that 
> > > > > Airflow
> > > > can
> > > > > resurrect a DAG on a restart when some elements of the DAG 
> > > > > contain operators/sensors that are dependent on external cloud 
> > > > > operations
> > (e.g.
> > > > AWS
> > > > > Batch).  This is feasible when Airflow can persist any unique 
> > > > > job-ID defined by the external job provider (e.g. AWS Batch
> > > > > "jobId") and any related identifiers for the job (e.g. AWS 
> > > > > Batch infrastructure ARNs for batch queue/compute-env etc and 
> > > > > all of this detail is captured in the AwsBatchOperator already).
> > > > > Assuming Airflow runs a DAG that spins up
> > > > 100's
> > > > > or 1000's of such external jobs and persists the external 
> > > > > "jobId", when Airflow crashes or is stopped for upgrades etc. 
> > > > > and restarted, the operators that submitted the jobs should be 
> > > > > able to try to check on the state of those previously 
> > > > > submitted jobs.  If the jobs are still running
> > > > on
> > > > > the external provider (e.g. AWS Batch), it should be able to 
> > > > > resume monitoring (poking) the job status without 
> > > > > re-submitting a duplicate job (also any failure to poke a job 
> > > > > should have some level of poke-retry behavior that does not 
> > > > > immediately fail the Airflow task that results in somehow 
> > > > > re-submitting the same job that is already running).  So, in 
> > > > > that context, what is the scope of the "reshedule-poke" 
> > > > > changes - do they
> > > > simply
> > > > > release a worker and so long as Airflow is "up" (has not 
> > > > > crashed), the reschedule can resume poking, but if Airflow 
> > > > > crashes, the whole thing starts over again because the state 
> > > > > of the task is not resilient to
> > > > Airflow
> > > > > crashing?  Or, does the work on the "reschedule-poke" also 
> > > > > provide resilience when Airflow crashes?  If the goal is to be 
> > > > > resilient to
> > > > Airflow
> > > > > crashes, what is required for the "reschedule-poke" work to 
> > > > > accomplish
> > > > that
> > > > > goal, if it doesn't already?  (Would the architecture for 
> > > > > Airflow resilience be out-of-scope in this context because it 
> > > > > involves more complexity, like a Kafka cluster?)
> > > > >
> > > > > -- Darren
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Jan 8, 2020 at 2:24 AM Jarek Potiuk 
> > > > > <jarek.pot...@polidea.com>
> > > > > wrote:
> > > > >
> > > > > > Commented as well. I think we are really going in a good
> direction!
> > > > > >
> > > > > > On Wed, Jan 8, 2020 at 9:22 AM Driesprong, Fokko 
> > > > > > <fo...@driesprong.frl
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks Jacob for building the document. I think we're on 
> > > > > > > the right
> > > > > track.
> > > > > > > I've added some comments and clarification to the 
> > > > > > > document, to
> > > > validate
> > > > > > > we're looking in the same direction. Would love to get 
> > > > > > > more people's opinion on this.
> > > > > > >
> > > > > > > Cheers, Fokko
> > > > > > >
> > > > > > > Op wo 8 jan. 2020 om 03:31 schreef Jacob Ferriero
> > > > > > > <jferri...@google.com.invalid>:
> > > > > > >
> > > > > > > > Image not working on dev list here is link to the github 
> > > > > > > > review
> > > > > comment
> > > > > > > > containing said image:
> > > > > > > > https://github.com/apache/airflow/pull/6370#issuecomment
> > > > > > > > -546
> > > > > > > > 58
> > > > > > > > 2724
> > > > .
> > > > > > > >
> > > > > > > > On Tue, Jan 7, 2020 at 5:40 PM Jacob Ferriero <
> > > > jferri...@google.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hello Dev List,
> > > > > > > >>
> > > > > > > >> The inspiration for this is to allow operators to start 
> > > > > > > >> a long
> > > > > running
> > > > > > > >> task on an external system and reschedule pokes for 
> > > > > > > >> completion
> > > > (e.g
> > > > > > > spark
> > > > > > > >> job on dataproc), instead of blocking a worker 
> > > > > > > >> (sketched out in
> > > > > #6210
> > > > > > > >> <https://github.com/apache/airflow/pull/6210>) to allow 
> > > > > > > >> freeing
> > > > up
> > > > > of
> > > > > > > >> slots between pokes. To do this requires supporting a 
> > > > > > > >> method for
> > > > > > storing
> > > > > > > >> task state between reschedules.
> > > > > > > >> It's worth noting that a task would maintain state only 
> > > > > > > >> during reschedules but clear state on retries. In this 
> > > > > > > >> way the task is
> > > > > > > idempotent
> > > > > > > >> before reaching a terminal state [SUCCES, FAIL,
> UP_FOR_RETRY].
> > > > This
> > > > > > > brings
> > > > > > > >> up a question of the scope of commitment to idempotency 
> > > > > > > >> of
> > > > > operators.
> > > > > > > If it
> > > > > > > >> is deemed acceptable for reschedules to maintain some 
> > > > > > > >> state, then
> > > > we
> > > > > > can
> > > > > > > >> free up workers between pokes.
> > > > > > > >>
> > > > > > > >> Because this is very similar to the purpose of XCom 
> > > > > > > >> it's been
> > > > > > postulated
> > > > > > > >> that we should support this behavior in XCom rather 
> > > > > > > >> than provide a
> > > > > new
> > > > > > > >> model in the db for TaskState. (Though discussion here 
> > > > > > > >> on which is
> > > > > > more
> > > > > > > >> appropriate is more than welcome.)
> > > > > > > >>
> > > > > > > >> I'd like to put forward a proposal to resurrect the 
> > > > > > > >> reverted
> > > > > > > >> #6370 <https://github.com/apache/airflow/pull/6370> in 
> > > > > > > >> order to
> > > > provide a
> > > > > > > >> modification to the lifetime of XComs under certain
> > conditions.
> > > > The
> > > > > > > diagram
> > > > > > > >> below helps illustrate the change originally proposed 
> > > > > > > >> in
> > #6370.
> > > > > There
> > > > > > > was
> > > > > > > >> concern about changing existing behavior (potentially
> > > > > > > >> breaking)
> > > > and
> > > > > > the
> > > > > > > >> fact that this makes operators stateful. Per the review 
> > > > > > > >> comments
> > > > and
> > > > > > an
> > > > > > > >> informal discussion (meetings notes <
> > > > > > >
> > > > > >
> > > > >
> > > > https://docs.google.com/document/d/1uuNCPAcwnn0smcDUJPDFMMjrK-z6
> > > > Z0os
> > > > es
> > > > PG7jVZ3oU/edit#
> > > > > > > >
> > > > > > > >> and #sig-async-operators) I'd like to modify the 
> > > > > > > >> approach
> > > > > > > >> #6370 to
> > > > > > only
> > > > > > > >> skip clearing of XCom if the Xom key is prefixed with 
> > > > > > > >> `airflow.models.xcom.DO_NOT_CLEAR_PREFIX = 
> > > > > > > >> "_STATEFUL_"` or
> > > > similar.
> > > > > > > >>
> > > > > > > >> [image: image.png]
> > > > > > > >> --
> > > > > > > >>
> > > > > > > >> *Jacob Ferriero*
> > > > > > > >>
> > > > > > > >> Strategic Cloud Engineer: Data Engineering
> > > > > > > >>
> > > > > > > >> jferri...@google.com
> > > > > > > >>
> > > > > > > >> 617-714-2509 <(617)%20714-2509>
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > >
> > > > > > > > *Jacob Ferriero*
> > > > > > > >
> > > > > > > > Strategic Cloud Engineer: Data Engineering
> > > > > > > >
> > > > > > > > jferri...@google.com
> > > > > > > >
> > > > > > > > 617-714-2509
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Jarek Potiuk
> > > > > > Polidea <https://www.polidea.com/> | Principal Software 
> > > > > > Engineer
> > > > > >
> > > > > > M: +48 660 796 129 <+48660796129>
> > > > > > [image: Polidea] <https://www.polidea.com/>
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Darren L. Weber, Ph.D.
> > > > > http://psdlw.users.sourceforge.net/
> > > > > http://psdlw.users.sourceforge.net/wordpress/
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Jarek Potiuk
> > > Polidea <https://www.polidea.com/> | Principal Software Engineer
> > >
> > > M: +48 660 796 129 <+48660796129>
> > > [image: Polidea] <https://www.polidea.com/>
> > >
> > >
> > >
> > > ==================================================================
> > > ====
> > > =========
> > >
> > > Please access the attached hyperlink for an important electronic 
> > > communications disclaimer:
> > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
> > > ==================================================================
> > > ====
> > > =========
> > >
> > >
> >
> >
> > --
> >
> > Jarek Potiuk
> > Polidea <https://www.polidea.com/> | Principal Software Engineer
> >
> > M: +48 660 796 129 <+48660796129>
> > [image: Polidea] <https://www.polidea.com/>
> >
> >
> >
> >
> ======================================================================
> =========
> >
> > Please access the attached hyperlink for an important electronic 
> > communications disclaimer:
> > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
> >
> ======================================================================
> =========
> >
> >
>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48660796129>
> [image: Polidea] <https://www.polidea.com/>
>



=============================================================================== 
Please access the attached hyperlink for an important electronic communications 
disclaimer: 
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
=============================================================================== 

Reply via email to