@Kaxil

> If you can give us more scenarios where
> you think stateful set can solve the issue, please let us know. Also, why
> do you think Variables are not the correct solution for it?



*Scenario 1: incremental data pull*
If you are incrementally pulling data from a database.  Each time you only
want to pull the records that are modified.  So you want to keep track of
`datetime_modified` column.
Each run, you check the max modified date in source and store it.  This is
your "high watermark" for this run.  Next run, you pull from last high
watermark.
In a sense you can't really design this process to be idempotent: if you
rerun the interval ('2019-12-01T10:00:00', '2019-12-01T11:00:00') you might
not get the same data (or any data at all) because in the source, those
records may have been updated (with new modified date).

*Scenario 2: incremental dim / fact / reporting processes in database*
Suppose I am building a fact table.  it has 10 source tables.   I need to
make this run incrementally.  It's possible that there may be differences
in update cadence in source tables.  One way to approach this is in each
run you calculate max modified in each source table, and take the min of
all of them.  That's your high watermark for this run.  Next time, you have
to process data from there.  This value needs to be stored somewhere.


*Scenario 3: dynamic date range / resuming large "initial load" processes*
Pulling data from APIs, often we might want it to run daily or hourly.
Using backfill functionality is sometimes prohibitively slow because you
have to carve up years of data into hourly or daily chunks.  One approach
is make a temporary `backfill_` job with modified schedule (e.g. monthly)
and let that run from beginning of time (with catchup=True).
Alternatively you could instead design in stateful way.  On initial run,
pull from beginning of time.  Thereafter, pull from last run time (and
maybe you need to do a lookback of 45 days or something, because data in
source can change).  Perhaps in your initial load you don't want to pull by
day (too slow) but you also don't want to pull in one batch -- so you carve
up batches that are appropriate to the situation.  And this is where it's
helpful to have a state persistence mechanism: you can use this to store
your progress on initial load, and in the event of failure, resume from
point of failure.  Yes you _could_ parse it from s3 or wherever, but doing
so presents its own challenges and risks, and it is convenient to just
store it in the database -- and not necessarily any more problematic.

*Scenario 4: no access*
As pointed out earlier, sometimes you might not have access to target.
E.g. i am pushing to someone elses s3 bucket and we only have PutObject but
can't read what's there.  So we can't look at target to infer state.

I'm sure there are other use cases out there.  Anything "incremental"
implies a state.

*Why not variables?  *
For sure they can be used for this kind of thing.   But a tad messy.   I
think if variables had a `namespace` column added to the primary key, that
would help.

I have experimented with two different alternative approaches at my
organization: ProcessState and TaskState.  And so far I like both of them.

Note: I am not suggesting these tables should be added to airflow -- I am
just sharing, in response to the question why not variables, and in the
context of thinking about how state can be handled more generally.

*TaskState* has primary key of dag_id + task_id with a `value` column that
is json data.  Users can persist arbitrary state there for their task.  It
is essentially a variable that is tied to a specific task.  The benefit of
this compared to variable is there is no need to think about naming
convention -- the operator can handle that for you because it knows dag id
and task id.   It's not TaskInstanceState; i.e. the scope is the *task*,
and not the task *run*.

I was gonna say that TaskState is equivalent to adding a state_data column
to the `task` table but there is no such table :)

I also created a *ProcessState* table.  ProcessState has a primary key of
namespace + process_name.  Otherwise it is identical to TaskState.  The
cost of using ProcessState is you have to choose a namespace and
process_name.  The reward, however, is that you can now freely move a task
from one dag to another (or rename it) without having to perform any
updates in the metastore.  So if we need to move a process from one
schedule to another, we can.  This you cannot do with XCom or TaskState.

ProcessState essentially is Variable but with an added `namespace` that
helps with organization and naming that indicates its specific purpose.

I like TaskState because you don't have to think about naming.  But for
some processes it's nice to be able change schedule i.e. move to different
dag easily.

When I use ProcessState, the process_name is driven by the target object,
e.g. table name, and the naming is controlled by a helper class.

One thing we don't get with these models is history -- tracking state over
time.  For my case, at this time, I don't really care about the history.
It could be obtained in logs in an emergency.  And I like the simplicity of
the view when you only have one record per process.





On Sat, Jan 11, 2020 at 7:12 AM Kaxil Naik <kaxiln...@gmail.com> wrote:

> @Fokko:
> If we go ahead with storing this info in Xcom, is your suggestion to use a
> fixed Prefix?
>
> Cheers,
> Kaxil
>
> On Sat, Jan 11, 2020, 14:50 Driesprong, Fokko <fo...@driesprong.frl>
> wrote:
>
> > I would still be in favor of pushing this into xcom, however not
> > changing the behavior of the current xcom implementation. Xcom is now for
> > intra-communication, but it could also be inter-communication, for me it
> is
> > very closely related. So having an additional option to explicitly
> include
> > the state of the current operator. TaskReschedule, as Jarek
> > mentioned, would also be an option, but this is on a TaskRun level and I
> > believe it should be on a Task level. As mentioned earlier, Variables
> feels
> > very messy to me. This is on a global level, so then you should template
> > the dag_id, task_id in there? It would also create a lot of entries in
> the
> > table.
> >
> > Regarding the FTP issues. Airflow is not going to magically solve your
> FTP
> > connections. If the FTP server is down, then it is okay for the operator
> to
> > fail, and retry somewhere later and then the FTP server is hopefully back
> > up. If there are flakey network issues, then you should implement some
> > retrying mechanism. We had a similar use case when using HTTP. An Airflow
> > user was listing through a paged REST API. If fetching one the pages
> > failed, the operator would fail and it had to start all over again. Using
> > Tenacity this has been fixed:
> >
> >
> https://github.com/apache/airflow/blob/fd78c65cabae2241a4c1d3a792e00620049cbf3e/airflow/hooks/http_hook.py#L186
> >
> > Ideally, you would like to have a path that contains the day in the FTP
> > path, so you know which files to copy for which day, and you can also
> > backfill this. This would exclude race conditions since you can do
> multiple
> > days in parallel.
> >
> > Using xcom this would also be possible. First have an operator that will
> > list the files on the FTP site, push this to xcom. Have another operator
> > that fetches the files that you already have and push this to xcom as
> well.
> > Using a Python operator you can easily do a diff, and then you know which
> > files to download. In this case, you should limit the active dag-runs to
> > one, to avoid race conditions.
> >
> > I believe that for many use cases you don't need to keep state in
> Airflow,
> > and this might be convenient, but it is just shifting the issue. If you
> can
> > fetch the issue from somewhere external, and this is the one and single
> > truth, then this should be the preferred solution.
> >
> > Cheers, Fokko
> >
> > Op za 11 jan. 2020 om 04:21 schreef Kaxil Naik <kaxiln...@gmail.com>:
> >
> > > Hey all,
> > >
> > > Really good document Jacob.
> > >
> > > Below are my thoughts on different topics discussed in the docs and the
> > > mailing list:
> > >
> > >
> > > *Prefix on Xcom*
> > > I don't think that is a good idea to mix this into Xcom. We should let
> > Xcom
> > > be used for exactly one purpose.
> > >
> > > *Storing state in Xcom between Retries*
> > > This is definitely going to break idempotency. When the default retries
> > are
> > > enabled this is going to create undesired effects.
> > >
> > > @Daniel Standish : I would like to more understand the needs of
> Stateful
> > > sets for sure. If you can give us more scenarios where
> > > you think stateful set can solve the issue, please let us know. Also,
> why
> > > do you think Variables are not the correct solution for it?
> > >
> > > I would imagine your custom operator can store some state in Variables.
> > For
> > > example, you can store a json containing the following in
> > > Airflow Variables:
> > >
> > >    - all_files
> > >    - files_copied
> > >
> > > The variable, in this case, would have the details it needs to resume
> the
> > > copying from where it stopped. You custom operator as a first
> > > the step should check the Variable (with deserialized JSON).
> > >
> > > *The new structure for storing Stateful structure for reschedules*
> > > This can be a new table that has a relationship with the TI table or
> > just a
> > > new column and this column can be loaded only when using
> > > Reschedule/async operators or sensors.
> > >
> > > Regards,
> > > Kaxil
> > >
> > > On Fri, Jan 10, 2020 at 11:45 PM Yingbo Wang <ybw...@gmail.com> wrote:
> > >
> > > > The updated AIP with smart sensor design and some implementation is
> in
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-17+Airflow+sensor+optimization
> > > > Open source PR https://github.com/apache/airflow/pull/5499
> > > >
> > > >
> > > > On Fri, Jan 10, 2020 at 1:44 PM Alex Guziel <alex.guz...@airbnb.com
> > > > .invalid>
> > > > wrote:
> > > >
> > > > > I feel like for this, we can incorporate the smart sensor we have
> > > > > implemented at Airbnb that we plan on open sourcing.
> > > > >
> > > > > The TL;DR is that it works by having the Sensor task run briefly
> and
> > > > > materialize some state into the DB which master sensor tasks poke
> > for.
> > > > This
> > > > > can be with custom time intervals.
> > > > >
> > > > > On Fri, Jan 10, 2020 at 1:42 PM Daniel Standish <
> > dpstand...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > I also am a big fan of adding better support for stateful tasks,
> > > > though I
> > > > > > know this is a thorny subject in airflow community.
> > > > > >
> > > > > > There are many data warehousing tasks where state makes a lot of
> > > sense.
> > > > > > While idempotence is a nice design pattern it's not the solution
> > for
> > > > > every
> > > > > > problem.
> > > > > >
> > > > > > XCom may not be the way, but there should be a way.  Variables
> > work,
> > > > but
> > > > > to
> > > > > > me it makes sense to have a separate structure that is associated
> > > with
> > > > > the
> > > > > > task, or the dag, or the task instance.
> > > > > >
> > > > > >
> > > > > > On Fri, Jan 10, 2020 at 12:36 PM Shaw, Damian P. <
> > > > > > damian.sha...@credit-suisse.com> wrote:
> > > > > >
> > > > > > > FYI the design of the already discussed pull would allow state
> to
> > > be
> > > > > > > persisted across retries:
> > > > > > >
> > https://github.com/apache/airflow/pull/6370#issuecomment-546582724
> > > > > While
> > > > > > > I agree in most cases you are correct I would however be
> greatly
> > > > > > > appreciated to not explicitly exclude this capability in the
> > design
> > > > of
> > > > > > > keeping state across reschedules.
> > > > > > >
> > > > > > > In lost of cases I already do exactly what you suggest, I scan
> > the
> > > > > state
> > > > > > > of the target system and resume from there. However in lengthy
> > > > > pipelines
> > > > > > > this becomes complex, for example I have a pipeline that goes
> > > > something
> > > > > > > like:   FTP Download -> Decrypt File and Zip File -> Upload to
> > Jump
> > > > > Host
> > > > > > > and remove Zip File -> Store in S3 Bucket.
> > > > > > >
> > > > > > > The data needs to be available at the end state as soon as
> > possible
> > > > so
> > > > > > the
> > > > > > > decryption operator is a sensor that is already running and
> waits
> > > for
> > > > > the
> > > > > > > file to be available and immediately decrypts and zips the
> file,
> > > same
> > > > > for
> > > > > > > the upload operator. From inside the corporate network
> > environment
> > > > it's
> > > > > > not
> > > > > > > possible to check the state of the s3 bucket so the orriginal
> FTP
> > > > > > Download
> > > > > > > process can not check the state of the final final target
> system.
> > > > Even
> > > > > if
> > > > > > > it was this could lead to a race condition if the data is in
> > > transit.
> > > > > > >
> > > > > > > I guess in environments where you have a lot of control and
> > aren't
> > > > > > > beholden to capracious policy, audit, and regulatory
> requirements
> > > > such
> > > > > > > scenarios must indeed seem niche :). Anyway we have a soluton,
> > just
> > > > > > asking
> > > > > > > you don't go out of your way to stop users from shooting
> > themselves
> > > > in
> > > > > > the
> > > > > > > foot if they're really determined to.
> > > > > > >
> > > > > > > Damian
> > > > > > >
> > > > > > > -----Original Message-----
> > > > > > > From: Chris Palmer <ch...@crpalmer.com>
> > > > > > > Sent: Friday, January 10, 2020 13:37
> > > > > > > To: dev@airflow.apache.org
> > > > > > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and
> > Poke
> > > > > > > Rescheduling in Operators
> > > > > > >
> > > > > > > I agree with Jarek that maintaining state between retries is
> not
> > > the
> > > > > > right
> > > > > > > thing to do. To be honest I'm not even convinced by the need
> for
> > > > state
> > > > > > > between reschedules myself.
> > > > > > >
> > > > > > > While I know from past experience that FTP is a pain to deal
> > with,
> > > I
> > > > > > think
> > > > > > > that your example is a pretty niche one. Additionally, when
> > > thinking
> > > > > > about
> > > > > > > idempotent task design, lots of tasks utilize state that exists
> > in
> > > > > other
> > > > > > > systems. You should be thinking about what state you want some
> > > > external
> > > > > > > system to be in after the task has run, rather than precisely
> > what
> > > > > > actions
> > > > > > > you want the task to do.
> > > > > > >
> > > > > > > It's the subtle difference between:
> > > > > > >
> > > > > > > "When it runs, this task should create the required table in my
> > > > > database"
> > > > > > > (by running a simple 'CREATE TABLE foobar .....')
> > > > > > >
> > > > > > > and
> > > > > > >
> > > > > > > "After this tasks has finished, the required table should exist
> > in
> > > my
> > > > > > > database" (by running 'CREATE TABLE IF NOT EXISTS foobar
> .....')
> > > > > > >
> > > > > > >
> > > > > > > The first will fail if run repeatedly (without someone taking
> > some
> > > > > other
> > > > > > > action like deleting the table). The second can be run as many
> > > times
> > > > as
> > > > > > you
> > > > > > > want without error, but it relies on the state that is
> maintained
> > > by
> > > > > your
> > > > > > > database.
> > > > > > >
> > > > > > > In your case the external state I think you should care about
> is
> > > the
> > > > > file
> > > > > > > system you are downloading the files to, as opposed to some
> > > external
> > > > > > table
> > > > > > > that could get out of sync with the file system. So I would
> write
> > > the
> > > > > > > operator so that the first thing it does is compare the
> complete
> > > list
> > > > > > with
> > > > > > > what already exists in the destination, and then only attempt
> to
> > > > > download
> > > > > > > the ones that are missing.
> > > > > > >
> > > > > > > Chris
> > > > > > >
> > > > > > > On Fri, Jan 10, 2020 at 12:52 PM Jarek Potiuk <
> > > > > jarek.pot...@polidea.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > I wonder what others think of it.
> > > > > > > >
> > > > > > > > On Fri, Jan 10, 2020 at 6:04 PM Shaw, Damian P. <
> > > > > > > > damian.sha...@credit-suisse.com> wrote:
> > > > > > > >
> > > > > > > > > I don't believe so, the default should be that state isn't
> > > > > preserved
> > > > > > > > > across retries, just that it's possible for the user to
> > enable
> > > it
> > > > > if
> > > > > > > > > they are willing to take on that complexity.
> > > > > > > >
> > > > > > > >
> > > > > > > > > We have lots of operators that do this already as if they
> > fail
> > > > part
> > > > > > > > > way through a job the overhead of resuming from the
> beginning
> > > > > rather
> > > > > > > > > than having state on their progress is too much, just
> > annoying
> > > we
> > > > > > > > > have to keep this state outside Airflow as it requires
> extra
> > > > > > > > > infrastructure for our
> > > > > > > > task
> > > > > > > > > scheduling.
> > > > > > > > >
> > > > > > > > > For example we have an FTP site that we need to download
> 250
> > > > files
> > > > > > > > > from, the full file list is provided to the operator, the
> FTP
> > > > > > > > > connection is
> > > > > > > > very
> > > > > > > > > unreliable and the job often fails midway, on retry we
> don't
> > > want
> > > > > to
> > > > > > > > resume
> > > > > > > > > from the beginning of the job so we store the state of our
> > > > progress
> > > > > > > > > in a table outside Airflow. We can't split the job in to
> 250
> > > > tasks
> > > > > > > > > because the FTP site only accepts 1 connection at a time so
> > the
> > > > > > > > > overhead of 250
> > > > > > > > logins
> > > > > > > > > would add an hour to the process and it would make the
> > Airflow
> > > UI
> > > > > > > > > near unusable.
> > > > > > > > >
> > > > > > > >
> > > > > > > > I do not know all the details of course - but your case seems
> > to
> > > be
> > > > > > > > solvable much easier and in "Airflow" way. You can have
> custom
> > > > > > > > operator that continues running until everything is
> downloaded
> > > and
> > > > > > > > retries failed transfer. The state of which file is
> downloaded
> > > > should
> > > > > > > > be kept in memory and even if FTP operation fails, it should
> > > retry
> > > > > > > > each failed file rather than fail the whole operator.  That
> > would
> > > > > keep
> > > > > > > > it idempotent, and keep the state in memory rather than in
> > > > Airflow's
> > > > > > > > DB or in external system. Even if you already have an
> operator
> > > that
> > > > > > > > transfers X files already and you do not want to change it,
> you
> > > can
> > > > > > > > likely wrap it/extend to keep list of files in memory and
> retry
> > > > only
> > > > > > > > those files that failed so far. IMHO In your solution you do
> > > > exactly
> > > > > > > > what you are not supposed to according to Airflow's design -
> > > unless
> > > > > > > > you do some extra logic and complexity your operator is not
> > > > > idempotent.
> > > > > > > >
> > > > > > > > For example - If you delete downloaded files for whatever
> > reason
> > > > and
> > > > > > > > keep the external state and run backfill, I believe what will
> > > > happen
> > > > > > > > (unless you have some extra logic) it will see (from external
> > > > state)
> > > > > > > > that the files were already downloaded and will not download
> > them
> > > > > > > > again. If you use the in-memory state, it will work as
> > expected -
> > > > > next
> > > > > > > > time you run it via back-fill,  it will re-download all
> files.
> > > > > > > >
> > > > > > > > J.
> > > > > > > >
> > > > > > > >
> > > > > > > > > Damian
> > > > > > > > >
> > > > > > > > > -----Original Message-----
> > > > > > > > > From: Jarek Potiuk <jarek.pot...@polidea.com>
> > > > > > > > > Sent: Friday, January 10, 2020 11:45
> > > > > > > > > To: dev@airflow.apache.org
> > > > > > > > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs
> and
> > > > Poke
> > > > > > > > > Rescheduling in Operators
> > > > > > > > >
> > > > > > > > > Just opening the Pandora's box :).
> > > > > > > > >
> > > > > > > > > I personally think we should not keep state between
> retries.
> > It
> > > > > > > > > opens up possibilities to whole host of problems and edge
> > > cases,
> > > > > and
> > > > > > > > > allows people to solve some approaches in non-airflow'y
> ways
> > -
> > > > > > > > > losing some important properties (mainly idempotency).
> Tasks
> > in
> > > > > > > > > Airflow should be idempotent
> > > > > > > > and
> > > > > > > > > stateless from the operator's author point of view).
> > > > > > > > >
> > > > > > > > > I think there is quite a big conceptual difference between
> > > > keeping
> > > > > > > > > the reschedule state (it's just optimising of execution of
> > the
> > > > same
> > > > > > > > > task) and keeping state between retries.
> > > > > > > > >
> > > > > > > > > Right now when you write your operator it's simple - no
> state
> > > to
> > > > > > > handle.
> > > > > > > > > XComs (and everything else) is cleared when task is re-run.
> > > > > > > > > With Poke reschedule proposal - the only thing you can do
> is
> > to
> > > > > > > > > save/retrieve a single ID attached to the current task
> > > instance.
> > > > > > > > > This id will not be cleared on reschedule, but it will be
> > > cleared
> > > > > on
> > > > > > > retry.
> > > > > > > > >
> > > > > > > > > If we introduce saving state on retries, it opens up a lot
> of
> > > > > > > > > questions - should we keep all retries? or just one? What
> > data
> > > > > > > > > should we keep -
> > > > > > > > should
> > > > > > > > > we allow more structured data? What guidelines should
> people
> > > > follow
> > > > > > > > > when writing their operators ? And it's a totally different
> > > > feature
> > > > > > > > > that
> > > > > > > > should
> > > > > > > > > be discussed separately.
> > > > > > > > >
> > > > > > > > > J.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Jan 10, 2020 at 5:15 PM Shaw, Damian P. <
> > > > > > > > > damian.sha...@credit-suisse.com> wrote:
> > > > > > > > >
> > > > > > > > > > I just wanted to add a related use case is task retries,
> > > there
> > > > > are
> > > > > > > > > > lots of scenarios where keeping state between the retries
> > as
> > > > well
> > > > > > > > > > as the reschedules would be extremely helpful, so as long
> > as
> > > > > > > > > > whatever the solution is isn't overly narrow I'd be
> > extremely
> > > > > > > appreciative.
> > > > > > > > > >
> > > > > > > > > > Damian
> > > > > > > > > >
> > > > > > > > > > -----Original Message-----
> > > > > > > > > > From: Jarek Potiuk <jarek.pot...@polidea.com>
> > > > > > > > > > Sent: Friday, January 10, 2020 11:05
> > > > > > > > > > To: dev@airflow.apache.org
> > > > > > > > > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs
> > and
> > > > > Poke
> > > > > > > > > > Rescheduling in Operators
> > > > > > > > > >
> > > > > > > > > > Also another point to discuss here. As an original author
> > of
> > > > the
> > > > > > > > > > idea of using prefix in xcom, I think after the
> > discussions I
> > > > > > > > > > changed my mind. I think that simply adding a field to an
> > > > > existing
> > > > > > > > > > table
> > > > > > > > > > (TaskReschedule?) where we could keep all the data that
> > need
> > > to
> > > > > be
> > > > > > > > > > persisted, seems to be a good idea. We do not impact
> > > > performance
> > > > > > > > > > too much (the table is already
> > > > > > > > > > queried) , we do not add too much complexity and we do
> not
> > > try
> > > > to
> > > > > > > > > > introduce a generic "state" storage - this would be a
> > > solution
> > > > > > > > > > dedicated to only handle rescheduling.
> > > > > > > > > >
> > > > > > > > > > On Fri, Jan 10, 2020 at 1:44 PM Driesprong, Fokko
> > > > > > > > > > <fo...@driesprong.frl>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > The repoke logic as it is now implemented with the
> > sensor,
> > > is
> > > > > > > > > > > able to recover from an unexpected crash. After each
> > poke,
> > > it
> > > > > > > > > > > will just go to sleep. If the process crashes in
> between,
> > > it
> > > > > > > > > > > might become a zombie task in the end, but this is also
> > > taken
> > > > > > > > > > > care of by the scheduler. In this case, the scheduler
> > > thinks
> > > > > the
> > > > > > > > > > > task is still running, but in
> > > > > > > > > > reality, it crashes.
> > > > > > > > > > > There is a timeout that will reset the execution.
> > > Hopefully,
> > > > > > > > > > > this doesn't happen often, and should only occur when
> > > > something
> > > > > > > > > > > is off (for example a machine crashed, or a network
> > > > partition,
> > > > > > > > > > > etc). HTH
> > > > > > > > > > >
> > > > > > > > > > > Personally I don't like duplicating the same table for
> > > such a
> > > > > > > > > > > similar use case. But that's a design choice I guess.
> > > > > > > > > > >
> > > > > > > > > > > If we go for the async executor, the above might be
> > > > different.
> > > > > I
> > > > > > > > > > > think it is good to not include this in the discussion.
> > > > > > > > > > >
> > > > > > > > > > > Cheers, Fokko
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Op do 9 jan. 2020 om 19:33 schreef Darren Weber <
> > > > > > > > > > > dweber.consult...@gmail.com
> > > > > > > > > > > >:
> > > > > > > > > > >
> > > > > > > > > > > > Not sure whether to add to this email thread or the
> > > > > google-doc
> > > > > > > > > > > > (not sure
> > > > > > > > > > > if
> > > > > > > > > > > > that google-doc is just a meeting-notes or if it
> should
> > > > > evolve
> > > > > > > > > > > > into a
> > > > > > > > > > > spec
> > > > > > > > > > > > :grin:).
> > > > > > > > > > > >
> > > > > > > > > > > > Maybe a stupid suggestion, but here it is anyway:
> > > > > > > > > > > >
> > > > > > > > > > > > XCom - communication between elements of a DAG
> > > > > > > > > > > >
> > > > > > > > > > > > XState - key/value store available for each element
> of
> > a
> > > > DAG
> > > > > > > > > > > >
> > > > > > > > > > > > Clearly separate the behavior of a stateful resource
> > > > (XState)
> > > > > > > > > > > > from one
> > > > > > > > > > > that
> > > > > > > > > > > > is not intended to be stateful (XCom), if that makes
> > any
> > > > > sense?
> > > > > > > > > > > (Creating
> > > > > > > > > > > > a new XState feature is similar to a new db-table, I
> > > > guess.)
> > > > > > > > > > > >
> > > > > > > > > > > > Just to explain what I understand about the goals of
> > how
> > > > > > > > > > > > Airflow should behave when it has some ability for an
> > > > > operator
> > > > > > > > > > > > to reschedule pokes and
> > > > > > > > > > > the
> > > > > > > > > > > > scope of the changes.  In the big picture, it's
> > important
> > > > > that
> > > > > > > > > > > > Airflow
> > > > > > > > > > > can
> > > > > > > > > > > > resurrect a DAG on a restart when some elements of
> the
> > > DAG
> > > > > > > > > > > > contain operators/sensors that are dependent on
> > external
> > > > > cloud
> > > > > > > > > > > > operations
> > > > > > > > > (e.g.
> > > > > > > > > > > AWS
> > > > > > > > > > > > Batch).  This is feasible when Airflow can persist
> any
> > > > unique
> > > > > > > > > > > > job-ID defined by the external job provider (e.g. AWS
> > > Batch
> > > > > > > > > > > > "jobId") and any related identifiers for the job
> (e.g.
> > > AWS
> > > > > > > > > > > > Batch infrastructure ARNs for batch queue/compute-env
> > etc
> > > > and
> > > > > > > > > > > > all of this detail is captured in the
> AwsBatchOperator
> > > > > > already).
> > > > > > > > > > > > Assuming Airflow runs a DAG that spins up
> > > > > > > > > > > 100's
> > > > > > > > > > > > or 1000's of such external jobs and persists the
> > external
> > > > > > > > > > > > "jobId", when Airflow crashes or is stopped for
> > upgrades
> > > > etc.
> > > > > > > > > > > > and restarted, the operators that submitted the jobs
> > > should
> > > > > be
> > > > > > > > > > > > able to try to check on the state of those previously
> > > > > > > > > > > > submitted jobs.  If the jobs are still running
> > > > > > > > > > > on
> > > > > > > > > > > > the external provider (e.g. AWS Batch), it should be
> > able
> > > > to
> > > > > > > > > > > > resume monitoring (poking) the job status without
> > > > > > > > > > > > re-submitting a duplicate job (also any failure to
> > poke a
> > > > job
> > > > > > > > > > > > should have some level of poke-retry behavior that
> does
> > > not
> > > > > > > > > > > > immediately fail the Airflow task that results in
> > somehow
> > > > > > > > > > > > re-submitting the same job that is already running).
> > So,
> > > > in
> > > > > > > > > > > > that context, what is the scope of the
> "reshedule-poke"
> > > > > > > > > > > > changes - do they
> > > > > > > > > > > simply
> > > > > > > > > > > > release a worker and so long as Airflow is "up" (has
> > not
> > > > > > > > > > > > crashed), the reschedule can resume poking, but if
> > > Airflow
> > > > > > > > > > > > crashes, the whole thing starts over again because
> the
> > > > state
> > > > > > > > > > > > of the task is not resilient to
> > > > > > > > > > > Airflow
> > > > > > > > > > > > crashing?  Or, does the work on the "reschedule-poke"
> > > also
> > > > > > > > > > > > provide resilience when Airflow crashes?  If the goal
> > is
> > > to
> > > > > be
> > > > > > > > > > > > resilient to
> > > > > > > > > > > Airflow
> > > > > > > > > > > > crashes, what is required for the "reschedule-poke"
> > work
> > > to
> > > > > > > > > > > > accomplish
> > > > > > > > > > > that
> > > > > > > > > > > > goal, if it doesn't already?  (Would the architecture
> > for
> > > > > > > > > > > > Airflow resilience be out-of-scope in this context
> > > because
> > > > it
> > > > > > > > > > > > involves more complexity, like a Kafka cluster?)
> > > > > > > > > > > >
> > > > > > > > > > > > -- Darren
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Jan 8, 2020 at 2:24 AM Jarek Potiuk
> > > > > > > > > > > > <jarek.pot...@polidea.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Commented as well. I think we are really going in a
> > > good
> > > > > > > > direction!
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Jan 8, 2020 at 9:22 AM Driesprong, Fokko
> > > > > > > > > > > > > <fo...@driesprong.frl
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks Jacob for building the document. I think
> > we're
> > > > on
> > > > > > > > > > > > > > the right
> > > > > > > > > > > > track.
> > > > > > > > > > > > > > I've added some comments and clarification to the
> > > > > > > > > > > > > > document, to
> > > > > > > > > > > validate
> > > > > > > > > > > > > > we're looking in the same direction. Would love
> to
> > > get
> > > > > > > > > > > > > > more people's opinion on this.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Cheers, Fokko
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Op wo 8 jan. 2020 om 03:31 schreef Jacob Ferriero
> > > > > > > > > > > > > > <jferri...@google.com.invalid>:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Image not working on dev list here is link to
> the
> > > > > github
> > > > > > > > > > > > > > > review
> > > > > > > > > > > > comment
> > > > > > > > > > > > > > > containing said image:
> > > > > > > > > > > > > > >
> > > > > https://github.com/apache/airflow/pull/6370#issuecomment
> > > > > > > > > > > > > > > -546
> > > > > > > > > > > > > > > 58
> > > > > > > > > > > > > > > 2724
> > > > > > > > > > > .
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Tue, Jan 7, 2020 at 5:40 PM Jacob Ferriero <
> > > > > > > > > > > jferri...@google.com>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> Hello Dev List,
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> The inspiration for this is to allow operators
> > to
> > > > > start
> > > > > > > > > > > > > > >> a long
> > > > > > > > > > > > running
> > > > > > > > > > > > > > >> task on an external system and reschedule
> pokes
> > > for
> > > > > > > > > > > > > > >> completion
> > > > > > > > > > > (e.g
> > > > > > > > > > > > > > spark
> > > > > > > > > > > > > > >> job on dataproc), instead of blocking a worker
> > > > > > > > > > > > > > >> (sketched out in
> > > > > > > > > > > > #6210
> > > > > > > > > > > > > > >> <https://github.com/apache/airflow/pull/6210
> >)
> > to
> > > > > allow
> > > > > > > > > > > > > > >> freeing
> > > > > > > > > > > up
> > > > > > > > > > > > of
> > > > > > > > > > > > > > >> slots between pokes. To do this requires
> > > supporting
> > > > a
> > > > > > > > > > > > > > >> method for
> > > > > > > > > > > > > storing
> > > > > > > > > > > > > > >> task state between reschedules.
> > > > > > > > > > > > > > >> It's worth noting that a task would maintain
> > state
> > > > > only
> > > > > > > > > > > > > > >> during reschedules but clear state on retries.
> > In
> > > > this
> > > > > > > > > > > > > > >> way the task is
> > > > > > > > > > > > > > idempotent
> > > > > > > > > > > > > > >> before reaching a terminal state [SUCCES,
> FAIL,
> > > > > > > > UP_FOR_RETRY].
> > > > > > > > > > > This
> > > > > > > > > > > > > > brings
> > > > > > > > > > > > > > >> up a question of the scope of commitment to
> > > > > idempotency
> > > > > > > > > > > > > > >> of
> > > > > > > > > > > > operators.
> > > > > > > > > > > > > > If it
> > > > > > > > > > > > > > >> is deemed acceptable for reschedules to
> maintain
> > > > some
> > > > > > > > > > > > > > >> state, then
> > > > > > > > > > > we
> > > > > > > > > > > > > can
> > > > > > > > > > > > > > >> free up workers between pokes.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Because this is very similar to the purpose of
> > > XCom
> > > > > > > > > > > > > > >> it's been
> > > > > > > > > > > > > postulated
> > > > > > > > > > > > > > >> that we should support this behavior in XCom
> > > rather
> > > > > > > > > > > > > > >> than provide a
> > > > > > > > > > > > new
> > > > > > > > > > > > > > >> model in the db for TaskState. (Though
> > discussion
> > > > here
> > > > > > > > > > > > > > >> on which is
> > > > > > > > > > > > > more
> > > > > > > > > > > > > > >> appropriate is more than welcome.)
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> I'd like to put forward a proposal to
> resurrect
> > > the
> > > > > > > > > > > > > > >> reverted
> > > > > > > > > > > > > > >> #6370 <
> > > https://github.com/apache/airflow/pull/6370>
> > > > > in
> > > > > > > > > > > > > > >> order to
> > > > > > > > > > > provide a
> > > > > > > > > > > > > > >> modification to the lifetime of XComs under
> > > certain
> > > > > > > > > conditions.
> > > > > > > > > > > The
> > > > > > > > > > > > > > diagram
> > > > > > > > > > > > > > >> below helps illustrate the change originally
> > > > proposed
> > > > > > > > > > > > > > >> in
> > > > > > > > > #6370.
> > > > > > > > > > > > There
> > > > > > > > > > > > > > was
> > > > > > > > > > > > > > >> concern about changing existing behavior
> > > > (potentially
> > > > > > > > > > > > > > >> breaking)
> > > > > > > > > > > and
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> fact that this makes operators stateful. Per
> the
> > > > > review
> > > > > > > > > > > > > > >> comments
> > > > > > > > > > > and
> > > > > > > > > > > > > an
> > > > > > > > > > > > > > >> informal discussion (meetings notes <
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > https://docs.google.com/document/d/1uuNCPAcwnn0smcDUJPDFMMjrK-z6
> > > > > > > > > > > Z0os
> > > > > > > > > > > es
> > > > > > > > > > > PG7jVZ3oU/edit#
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> and #sig-async-operators) I'd like to modify
> the
> > > > > > > > > > > > > > >> approach
> > > > > > > > > > > > > > >> #6370 to
> > > > > > > > > > > > > only
> > > > > > > > > > > > > > >> skip clearing of XCom if the Xom key is
> prefixed
> > > > with
> > > > > > > > > > > > > > >> `airflow.models.xcom.DO_NOT_CLEAR_PREFIX =
> > > > > > > > > > > > > > >> "_STATEFUL_"` or
> > > > > > > > > > > similar.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> [image: image.png]
> > > > > > > > > > > > > > >> --
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> *Jacob Ferriero*
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Strategic Cloud Engineer: Data Engineering
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> jferri...@google.com
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> 617-714-2509 <(617)%20714-2509>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > *Jacob Ferriero*
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Strategic Cloud Engineer: Data Engineering
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > jferri...@google.com
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 617-714-2509
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > --
> > > > > > > > > > > > >
> > > > > > > > > > > > > Jarek Potiuk
> > > > > > > > > > > > > Polidea <https://www.polidea.com/> | Principal
> > > Software
> > > > > > > > > > > > > Engineer
> > > > > > > > > > > > >
> > > > > > > > > > > > > M: +48 660 796 129 <+48660796129>
> > > > > > > > > > > > > [image: Polidea] <https://www.polidea.com/>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > > Darren L. Weber, Ph.D.
> > > > > > > > > > > > http://psdlw.users.sourceforge.net/
> > > > > > > > > > > > http://psdlw.users.sourceforge.net/wordpress/
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > >
> > > > > > > > > > Jarek Potiuk
> > > > > > > > > > Polidea <https://www.polidea.com/> | Principal Software
> > > > Engineer
> > > > > > > > > >
> > > > > > > > > > M: +48 660 796 129 <+48660796129>
> > > > > > > > > > [image: Polidea] <https://www.polidea.com/>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > ==================================================================
> > > > > > > > > > ====
> > > > > > > > > > =========
> > > > > > > > > >
> > > > > > > > > > Please access the attached hyperlink for an important
> > > > electronic
> > > > > > > > > > communications disclaimer:
> > > > > > > > > >
> > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
> > > > > > > > > >
> > > > > ==================================================================
> > > > > > > > > > ====
> > > > > > > > > > =========
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > Jarek Potiuk
> > > > > > > > > Polidea <https://www.polidea.com/> | Principal Software
> > > Engineer
> > > > > > > > >
> > > > > > > > > M: +48 660 796 129 <+48660796129>
> > > > > > > > > [image: Polidea] <https://www.polidea.com/>
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > >
> > ======================================================================
> > > > > > > > =========
> > > > > > > > >
> > > > > > > > > Please access the attached hyperlink for an important
> > > electronic
> > > > > > > > > communications disclaimer:
> > > > > > > > >
> > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
> > > > > > > > >
> > > > > > > >
> > > > >
> > ======================================================================
> > > > > > > > =========
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > >
> > > > > > > > Jarek Potiuk
> > > > > > > > Polidea <https://www.polidea.com/> | Principal Software
> > Engineer
> > > > > > > >
> > > > > > > > M: +48 660 796 129 <+48660796129>
> > > > > > > > [image: Polidea] <https://www.polidea.com/>
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> ===============================================================================
> > > > > > >
> > > > > > > Please access the attached hyperlink for an important
> electronic
> > > > > > > communications disclaimer:
> > > > > > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> ===============================================================================
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to