I also am a big fan of adding better support for stateful tasks, though I
know this is a thorny subject in airflow community.

There are many data warehousing tasks where state makes a lot of sense.
While idempotence is a nice design pattern it's not the solution for every
problem.

XCom may not be the way, but there should be a way.  Variables work, but to
me it makes sense to have a separate structure that is associated with the
task, or the dag, or the task instance.


On Fri, Jan 10, 2020 at 12:36 PM Shaw, Damian P. <
damian.sha...@credit-suisse.com> wrote:

> FYI the design of the already discussed pull would allow state to be
> persisted across retries:
> https://github.com/apache/airflow/pull/6370#issuecomment-546582724 While
> I agree in most cases you are correct I would however be greatly
> appreciated to not explicitly exclude this capability in the design of
> keeping state across reschedules.
>
> In lost of cases I already do exactly what you suggest, I scan the state
> of the target system and resume from there. However in lengthy pipelines
> this becomes complex, for example I have a pipeline that goes something
> like:   FTP Download -> Decrypt File and Zip File -> Upload to Jump Host
> and remove Zip File -> Store in S3 Bucket.
>
> The data needs to be available at the end state as soon as possible so the
> decryption operator is a sensor that is already running and waits for the
> file to be available and immediately decrypts and zips the file, same for
> the upload operator. From inside the corporate network environment it's not
> possible to check the state of the s3 bucket so the orriginal FTP Download
> process can not check the state of the final final target system. Even if
> it was this could lead to a race condition if the data is in transit.
>
> I guess in environments where you have a lot of control and aren't
> beholden to capracious policy, audit, and regulatory requirements such
> scenarios must indeed seem niche :). Anyway we have a soluton, just asking
> you don't go out of your way to stop users from shooting themselves in the
> foot if they're really determined to.
>
> Damian
>
> -----Original Message-----
> From: Chris Palmer <ch...@crpalmer.com>
> Sent: Friday, January 10, 2020 13:37
> To: dev@airflow.apache.org
> Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke
> Rescheduling in Operators
>
> I agree with Jarek that maintaining state between retries is not the right
> thing to do. To be honest I'm not even convinced by the need for state
> between reschedules myself.
>
> While I know from past experience that FTP is a pain to deal with, I think
> that your example is a pretty niche one. Additionally, when thinking about
> idempotent task design, lots of tasks utilize state that exists in other
> systems. You should be thinking about what state you want some external
> system to be in after the task has run, rather than precisely what actions
> you want the task to do.
>
> It's the subtle difference between:
>
> "When it runs, this task should create the required table in my database"
> (by running a simple 'CREATE TABLE foobar .....')
>
> and
>
> "After this tasks has finished, the required table should exist in my
> database" (by running 'CREATE TABLE IF NOT EXISTS foobar .....')
>
>
> The first will fail if run repeatedly (without someone taking some other
> action like deleting the table). The second can be run as many times as you
> want without error, but it relies on the state that is maintained by your
> database.
>
> In your case the external state I think you should care about is the file
> system you are downloading the files to, as opposed to some external table
> that could get out of sync with the file system. So I would write the
> operator so that the first thing it does is compare the complete list with
> what already exists in the destination, and then only attempt to download
> the ones that are missing.
>
> Chris
>
> On Fri, Jan 10, 2020 at 12:52 PM Jarek Potiuk <jarek.pot...@polidea.com>
> wrote:
>
> > I wonder what others think of it.
> >
> > On Fri, Jan 10, 2020 at 6:04 PM Shaw, Damian P. <
> > damian.sha...@credit-suisse.com> wrote:
> >
> > > I don't believe so, the default should be that state isn't preserved
> > > across retries, just that it's possible for the user to enable it if
> > > they are willing to take on that complexity.
> >
> >
> > > We have lots of operators that do this already as if they fail part
> > > way through a job the overhead of resuming from the beginning rather
> > > than having state on their progress is too much, just annoying we
> > > have to keep this state outside Airflow as it requires extra
> > > infrastructure for our
> > task
> > > scheduling.
> > >
> > > For example we have an FTP site that we need to download 250 files
> > > from, the full file list is provided to the operator, the FTP
> > > connection is
> > very
> > > unreliable and the job often fails midway, on retry we don't want to
> > resume
> > > from the beginning of the job so we store the state of our progress
> > > in a table outside Airflow. We can't split the job in to 250 tasks
> > > because the FTP site only accepts 1 connection at a time so the
> > > overhead of 250
> > logins
> > > would add an hour to the process and it would make the Airflow UI
> > > near unusable.
> > >
> >
> > I do not know all the details of course - but your case seems to be
> > solvable much easier and in "Airflow" way. You can have custom
> > operator that continues running until everything is downloaded and
> > retries failed transfer. The state of which file is downloaded should
> > be kept in memory and even if FTP operation fails, it should retry
> > each failed file rather than fail the whole operator.  That would keep
> > it idempotent, and keep the state in memory rather than in Airflow's
> > DB or in external system. Even if you already have an operator that
> > transfers X files already and you do not want to change it, you can
> > likely wrap it/extend to keep list of files in memory and retry only
> > those files that failed so far. IMHO In your solution you do exactly
> > what you are not supposed to according to Airflow's design - unless
> > you do some extra logic and complexity your operator is not idempotent.
> >
> > For example - If you delete downloaded files for whatever reason and
> > keep the external state and run backfill, I believe what will happen
> > (unless you have some extra logic) it will see (from external state)
> > that the files were already downloaded and will not download them
> > again. If you use the in-memory state, it will work as expected - next
> > time you run it via back-fill,  it will re-download all files.
> >
> > J.
> >
> >
> > > Damian
> > >
> > > -----Original Message-----
> > > From: Jarek Potiuk <jarek.pot...@polidea.com>
> > > Sent: Friday, January 10, 2020 11:45
> > > To: dev@airflow.apache.org
> > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke
> > > Rescheduling in Operators
> > >
> > > Just opening the Pandora's box :).
> > >
> > > I personally think we should not keep state between retries. It
> > > opens up possibilities to whole host of problems and edge cases, and
> > > allows people to solve some approaches in non-airflow'y ways -
> > > losing some important properties (mainly idempotency). Tasks in
> > > Airflow should be idempotent
> > and
> > > stateless from the operator's author point of view).
> > >
> > > I think there is quite a big conceptual difference between keeping
> > > the reschedule state (it's just optimising of execution of the same
> > > task) and keeping state between retries.
> > >
> > > Right now when you write your operator it's simple - no state to
> handle.
> > > XComs (and everything else) is cleared when task is re-run.
> > > With Poke reschedule proposal - the only thing you can do is to
> > > save/retrieve a single ID attached to the current task instance.
> > > This id will not be cleared on reschedule, but it will be cleared on
> retry.
> > >
> > > If we introduce saving state on retries, it opens up a lot of
> > > questions - should we keep all retries? or just one? What data
> > > should we keep -
> > should
> > > we allow more structured data? What guidelines should people follow
> > > when writing their operators ? And it's a totally different feature
> > > that
> > should
> > > be discussed separately.
> > >
> > > J.
> > >
> > >
> > > On Fri, Jan 10, 2020 at 5:15 PM Shaw, Damian P. <
> > > damian.sha...@credit-suisse.com> wrote:
> > >
> > > > I just wanted to add a related use case is task retries, there are
> > > > lots of scenarios where keeping state between the retries as well
> > > > as the reschedules would be extremely helpful, so as long as
> > > > whatever the solution is isn't overly narrow I'd be extremely
> appreciative.
> > > >
> > > > Damian
> > > >
> > > > -----Original Message-----
> > > > From: Jarek Potiuk <jarek.pot...@polidea.com>
> > > > Sent: Friday, January 10, 2020 11:05
> > > > To: dev@airflow.apache.org
> > > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke
> > > > Rescheduling in Operators
> > > >
> > > > Also another point to discuss here. As an original author of the
> > > > idea of using prefix in xcom, I think after the discussions I
> > > > changed my mind. I think that simply adding a field to an existing
> > > > table
> > > > (TaskReschedule?) where we could keep all the data that need to be
> > > > persisted, seems to be a good idea. We do not impact performance
> > > > too much (the table is already
> > > > queried) , we do not add too much complexity and we do not try to
> > > > introduce a generic "state" storage - this would be a solution
> > > > dedicated to only handle rescheduling.
> > > >
> > > > On Fri, Jan 10, 2020 at 1:44 PM Driesprong, Fokko
> > > > <fo...@driesprong.frl>
> > > > wrote:
> > > >
> > > > > The repoke logic as it is now implemented with the sensor, is
> > > > > able to recover from an unexpected crash. After each poke, it
> > > > > will just go to sleep. If the process crashes in between, it
> > > > > might become a zombie task in the end, but this is also taken
> > > > > care of by the scheduler. In this case, the scheduler thinks the
> > > > > task is still running, but in
> > > > reality, it crashes.
> > > > > There is a timeout that will reset the execution. Hopefully,
> > > > > this doesn't happen often, and should only occur when something
> > > > > is off (for example a machine crashed, or a network partition,
> > > > > etc). HTH
> > > > >
> > > > > Personally I don't like duplicating the same table for such a
> > > > > similar use case. But that's a design choice I guess.
> > > > >
> > > > > If we go for the async executor, the above might be different. I
> > > > > think it is good to not include this in the discussion.
> > > > >
> > > > > Cheers, Fokko
> > > > >
> > > > >
> > > > > Op do 9 jan. 2020 om 19:33 schreef Darren Weber <
> > > > > dweber.consult...@gmail.com
> > > > > >:
> > > > >
> > > > > > Not sure whether to add to this email thread or the google-doc
> > > > > > (not sure
> > > > > if
> > > > > > that google-doc is just a meeting-notes or if it should evolve
> > > > > > into a
> > > > > spec
> > > > > > :grin:).
> > > > > >
> > > > > > Maybe a stupid suggestion, but here it is anyway:
> > > > > >
> > > > > > XCom - communication between elements of a DAG
> > > > > >
> > > > > > XState - key/value store available for each element of a DAG
> > > > > >
> > > > > > Clearly separate the behavior of a stateful resource (XState)
> > > > > > from one
> > > > > that
> > > > > > is not intended to be stateful (XCom), if that makes any sense?
> > > > > (Creating
> > > > > > a new XState feature is similar to a new db-table, I guess.)
> > > > > >
> > > > > > Just to explain what I understand about the goals of how
> > > > > > Airflow should behave when it has some ability for an operator
> > > > > > to reschedule pokes and
> > > > > the
> > > > > > scope of the changes.  In the big picture, it's important that
> > > > > > Airflow
> > > > > can
> > > > > > resurrect a DAG on a restart when some elements of the DAG
> > > > > > contain operators/sensors that are dependent on external cloud
> > > > > > operations
> > > (e.g.
> > > > > AWS
> > > > > > Batch).  This is feasible when Airflow can persist any unique
> > > > > > job-ID defined by the external job provider (e.g. AWS Batch
> > > > > > "jobId") and any related identifiers for the job (e.g. AWS
> > > > > > Batch infrastructure ARNs for batch queue/compute-env etc and
> > > > > > all of this detail is captured in the AwsBatchOperator already).
> > > > > > Assuming Airflow runs a DAG that spins up
> > > > > 100's
> > > > > > or 1000's of such external jobs and persists the external
> > > > > > "jobId", when Airflow crashes or is stopped for upgrades etc.
> > > > > > and restarted, the operators that submitted the jobs should be
> > > > > > able to try to check on the state of those previously
> > > > > > submitted jobs.  If the jobs are still running
> > > > > on
> > > > > > the external provider (e.g. AWS Batch), it should be able to
> > > > > > resume monitoring (poking) the job status without
> > > > > > re-submitting a duplicate job (also any failure to poke a job
> > > > > > should have some level of poke-retry behavior that does not
> > > > > > immediately fail the Airflow task that results in somehow
> > > > > > re-submitting the same job that is already running).  So, in
> > > > > > that context, what is the scope of the "reshedule-poke"
> > > > > > changes - do they
> > > > > simply
> > > > > > release a worker and so long as Airflow is "up" (has not
> > > > > > crashed), the reschedule can resume poking, but if Airflow
> > > > > > crashes, the whole thing starts over again because the state
> > > > > > of the task is not resilient to
> > > > > Airflow
> > > > > > crashing?  Or, does the work on the "reschedule-poke" also
> > > > > > provide resilience when Airflow crashes?  If the goal is to be
> > > > > > resilient to
> > > > > Airflow
> > > > > > crashes, what is required for the "reschedule-poke" work to
> > > > > > accomplish
> > > > > that
> > > > > > goal, if it doesn't already?  (Would the architecture for
> > > > > > Airflow resilience be out-of-scope in this context because it
> > > > > > involves more complexity, like a Kafka cluster?)
> > > > > >
> > > > > > -- Darren
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Jan 8, 2020 at 2:24 AM Jarek Potiuk
> > > > > > <jarek.pot...@polidea.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Commented as well. I think we are really going in a good
> > direction!
> > > > > > >
> > > > > > > On Wed, Jan 8, 2020 at 9:22 AM Driesprong, Fokko
> > > > > > > <fo...@driesprong.frl
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks Jacob for building the document. I think we're on
> > > > > > > > the right
> > > > > > track.
> > > > > > > > I've added some comments and clarification to the
> > > > > > > > document, to
> > > > > validate
> > > > > > > > we're looking in the same direction. Would love to get
> > > > > > > > more people's opinion on this.
> > > > > > > >
> > > > > > > > Cheers, Fokko
> > > > > > > >
> > > > > > > > Op wo 8 jan. 2020 om 03:31 schreef Jacob Ferriero
> > > > > > > > <jferri...@google.com.invalid>:
> > > > > > > >
> > > > > > > > > Image not working on dev list here is link to the github
> > > > > > > > > review
> > > > > > comment
> > > > > > > > > containing said image:
> > > > > > > > > https://github.com/apache/airflow/pull/6370#issuecomment
> > > > > > > > > -546
> > > > > > > > > 58
> > > > > > > > > 2724
> > > > > .
> > > > > > > > >
> > > > > > > > > On Tue, Jan 7, 2020 at 5:40 PM Jacob Ferriero <
> > > > > jferri...@google.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hello Dev List,
> > > > > > > > >>
> > > > > > > > >> The inspiration for this is to allow operators to start
> > > > > > > > >> a long
> > > > > > running
> > > > > > > > >> task on an external system and reschedule pokes for
> > > > > > > > >> completion
> > > > > (e.g
> > > > > > > > spark
> > > > > > > > >> job on dataproc), instead of blocking a worker
> > > > > > > > >> (sketched out in
> > > > > > #6210
> > > > > > > > >> <https://github.com/apache/airflow/pull/6210>) to allow
> > > > > > > > >> freeing
> > > > > up
> > > > > > of
> > > > > > > > >> slots between pokes. To do this requires supporting a
> > > > > > > > >> method for
> > > > > > > storing
> > > > > > > > >> task state between reschedules.
> > > > > > > > >> It's worth noting that a task would maintain state only
> > > > > > > > >> during reschedules but clear state on retries. In this
> > > > > > > > >> way the task is
> > > > > > > > idempotent
> > > > > > > > >> before reaching a terminal state [SUCCES, FAIL,
> > UP_FOR_RETRY].
> > > > > This
> > > > > > > > brings
> > > > > > > > >> up a question of the scope of commitment to idempotency
> > > > > > > > >> of
> > > > > > operators.
> > > > > > > > If it
> > > > > > > > >> is deemed acceptable for reschedules to maintain some
> > > > > > > > >> state, then
> > > > > we
> > > > > > > can
> > > > > > > > >> free up workers between pokes.
> > > > > > > > >>
> > > > > > > > >> Because this is very similar to the purpose of XCom
> > > > > > > > >> it's been
> > > > > > > postulated
> > > > > > > > >> that we should support this behavior in XCom rather
> > > > > > > > >> than provide a
> > > > > > new
> > > > > > > > >> model in the db for TaskState. (Though discussion here
> > > > > > > > >> on which is
> > > > > > > more
> > > > > > > > >> appropriate is more than welcome.)
> > > > > > > > >>
> > > > > > > > >> I'd like to put forward a proposal to resurrect the
> > > > > > > > >> reverted
> > > > > > > > >> #6370 <https://github.com/apache/airflow/pull/6370> in
> > > > > > > > >> order to
> > > > > provide a
> > > > > > > > >> modification to the lifetime of XComs under certain
> > > conditions.
> > > > > The
> > > > > > > > diagram
> > > > > > > > >> below helps illustrate the change originally proposed
> > > > > > > > >> in
> > > #6370.
> > > > > > There
> > > > > > > > was
> > > > > > > > >> concern about changing existing behavior (potentially
> > > > > > > > >> breaking)
> > > > > and
> > > > > > > the
> > > > > > > > >> fact that this makes operators stateful. Per the review
> > > > > > > > >> comments
> > > > > and
> > > > > > > an
> > > > > > > > >> informal discussion (meetings notes <
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > https://docs.google.com/document/d/1uuNCPAcwnn0smcDUJPDFMMjrK-z6
> > > > > Z0os
> > > > > es
> > > > > PG7jVZ3oU/edit#
> > > > > > > > >
> > > > > > > > >> and #sig-async-operators) I'd like to modify the
> > > > > > > > >> approach
> > > > > > > > >> #6370 to
> > > > > > > only
> > > > > > > > >> skip clearing of XCom if the Xom key is prefixed with
> > > > > > > > >> `airflow.models.xcom.DO_NOT_CLEAR_PREFIX =
> > > > > > > > >> "_STATEFUL_"` or
> > > > > similar.
> > > > > > > > >>
> > > > > > > > >> [image: image.png]
> > > > > > > > >> --
> > > > > > > > >>
> > > > > > > > >> *Jacob Ferriero*
> > > > > > > > >>
> > > > > > > > >> Strategic Cloud Engineer: Data Engineering
> > > > > > > > >>
> > > > > > > > >> jferri...@google.com
> > > > > > > > >>
> > > > > > > > >> 617-714-2509 <(617)%20714-2509>
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > *Jacob Ferriero*
> > > > > > > > >
> > > > > > > > > Strategic Cloud Engineer: Data Engineering
> > > > > > > > >
> > > > > > > > > jferri...@google.com
> > > > > > > > >
> > > > > > > > > 617-714-2509
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Jarek Potiuk
> > > > > > > Polidea <https://www.polidea.com/> | Principal Software
> > > > > > > Engineer
> > > > > > >
> > > > > > > M: +48 660 796 129 <+48660796129>
> > > > > > > [image: Polidea] <https://www.polidea.com/>
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Darren L. Weber, Ph.D.
> > > > > > http://psdlw.users.sourceforge.net/
> > > > > > http://psdlw.users.sourceforge.net/wordpress/
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Jarek Potiuk
> > > > Polidea <https://www.polidea.com/> | Principal Software Engineer
> > > >
> > > > M: +48 660 796 129 <+48660796129>
> > > > [image: Polidea] <https://www.polidea.com/>
> > > >
> > > >
> > > >
> > > > ==================================================================
> > > > ====
> > > > =========
> > > >
> > > > Please access the attached hyperlink for an important electronic
> > > > communications disclaimer:
> > > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
> > > > ==================================================================
> > > > ====
> > > > =========
> > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Jarek Potiuk
> > > Polidea <https://www.polidea.com/> | Principal Software Engineer
> > >
> > > M: +48 660 796 129 <+48660796129>
> > > [image: Polidea] <https://www.polidea.com/>
> > >
> > >
> > >
> > >
> > ======================================================================
> > =========
> > >
> > > Please access the attached hyperlink for an important electronic
> > > communications disclaimer:
> > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
> > >
> > ======================================================================
> > =========
> > >
> > >
> >
> >
> > --
> >
> > Jarek Potiuk
> > Polidea <https://www.polidea.com/> | Principal Software Engineer
> >
> > M: +48 660 796 129 <+48660796129>
> > [image: Polidea] <https://www.polidea.com/>
> >
>
>
>
> ===============================================================================
>
> Please access the attached hyperlink for an important electronic
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
> ===============================================================================
>
>

Reply via email to