RE: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-13 Thread Shaw, Damian P.
Here are a couple more examples of where as an Airflow user I am introducing 
state in to my tasks (so they technically breaks idempotency):
* Authentication mechanisms that return errors based on wrong details, state is 
kept so retries don't cause multiple login attempts which would cause the 
account to be locked, in thie case manual intervention is required to re-enable 
the tasks
* Sensors that send email/sms notifications on certain events, state is kept so 
they're not redunently sent on retry/reschedule, state is also kept so they can 
"escalate" in urgency each time they are sent out even across 
retries/reschedules.

I understand wanting to keep Airflow tasks as stateless as possible but from my 
perspective real world systems are not stateless (or without side effect) and 
modeling that state is sometimes the only viable solution. The motivation of 
this thread is that some users are saying they need state, I can tell you at 
least some users are already hacking state in to their existing Airflow setup 
and would greately appreciate if implemented it doesn't have arbitrary limits 
on it's use :). My 2 cents as a user anyway.

Damian

-Original Message-
From: Daniel Standish  
Sent: Monday, January 13, 2020 14:03
To: dev@airflow.apache.org
Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling 
in Operators

@Jarek & Fokko
Firstly, thank you for the thorough and thoughtful replies -- I appreciate it.

I recognize that the execution_date-based idempotence pattern is core to 
airflow.  I understand that it motivated a lot of the design decisions, and 
that it is a pattern that is endorsed and explicitly encouraged in the 
documentation, and by the community in general.

But surely it's not the position of the airflow project that data pipelines 
deviating from this pattern are officially forbidden and unsupported?

I think there is a place in airflow for tasks that are not driven by 
"execution_date", as well as tasks for which "airflow backfill" does not make 
sense.  Airflow provides great flexibility, and a lot of value and applications 
beyond that one pattern.

I believe there are cases where it makes sense to persist some information for 
a task or a dag.  Incremental processes is one.  The reschedule poke operator 
is another.  Breaking a heavy data transfer process into sub-batches and 
storing progress is another.  And I'd be in favor of providing a better 
built-in mechanism to do it within airflow.  I would not constrain it to async 
job ids; just arbitrary data however the user wants to use it.

I am not convinced that adding the capacity to store information associated 
with dag+task undermines or meaningfully alters the airflow project.

I would bet that pipelines not following the canonical pattern are pretty 
common in the wild.  And when you don't for whatever reason lean on 
execution_date, it's nice to have a different mechanism for storing state.

That's my take.

Detailed responses below.

@Jarek
RE: the "cluster maintainer" role, and the "idempotence guarantee", and "by 
adding state you lose the property of idempotency"

I don't think airflow provides any such guarantee as it is.

   - By themselves, even built-in airflow operators do not in general
   provide an idempotence guarantee.  It really depends how they are used.
  - For example, a _SqlOperator has a `sql` field.  Whether the task is
  idempotent or not depends entirely on the `sql` that is passed to the
  operator.
   - With non-built-in operators, all bets are off.
   - Variables and XCom can be used to persist and transmit state.
   - People use catchup=False to facilitate non-idempotent tasks.
   - As a scheduler, it can execute arbitrary processes.

These facts mean that as a cluster maintainer, if there is an idempotence 
guarantee, it's because the team enforces it -- not because airflow guarantees 
it.  Airflow is simply unable to enforce this by itself.

@Jarek & Fokko
RE: you are using the wrong tool for this job, airflow is not for streaming, 
you should look at apache beam, read "beyond the horizon"

Incremental processes are not necessarily streaming processes.  The incremental 
scenarios I outlined are still batch processes -- hourly, every
4 hours, daily, that kind of thing.  I am just determining the intervals in a 
different way from an execution_date-driven process.
Should I use another tool?  Fundamentally, what we're talking about is the 
ability to store a small bit of data associated with a task+dag.  When I 
consider this gap in the context of everything airflow provides, it's 
insignificant by comparison, and there are ways I can solve it:

   - xcom (notwithstanding edge case problems e.g. getting cleared)
   - variables (notwithstanding messiness)
   - roll your own model (notwithstanding effort and complexity on my
   system)
  - incidentally, because of airflow's plugin system, if you do roll
  your own, you can even expose your 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-13 Thread Daniel Standish
@Jarek & Fokko
Firstly, thank you for the thorough and thoughtful replies -- I appreciate
it.

I recognize that the execution_date-based idempotence pattern is core to
airflow.  I understand that it motivated a lot of the design decisions, and
that it is a pattern that is endorsed and explicitly encouraged in the
documentation, and by the community in general.

But surely it's not the position of the airflow project that data pipelines
deviating from this pattern are officially forbidden and unsupported?

I think there is a place in airflow for tasks that are not driven by
"execution_date", as well as tasks for which "airflow backfill" does not
make sense.  Airflow provides great flexibility, and a lot of value and
applications beyond that one pattern.

I believe there are cases where it makes sense to persist some information
for a task or a dag.  Incremental processes is one.  The reschedule poke
operator is another.  Breaking a heavy data transfer process into
sub-batches and storing progress is another.  And I'd be in favor of
providing a better built-in mechanism to do it within airflow.  I would not
constrain it to async job ids; just arbitrary data however the user wants
to use it.

I am not convinced that adding the capacity to store information associated
with dag+task undermines or meaningfully alters the airflow project.

I would bet that pipelines not following the canonical pattern are pretty
common in the wild.  And when you don't for whatever reason lean on
execution_date, it's nice to have a different mechanism for storing state.

That's my take.

Detailed responses below.

@Jarek
RE: the "cluster maintainer" role, and the "idempotence guarantee", and "by
adding state you lose the property of idempotency"

I don't think airflow provides any such guarantee as it is.

   - By themselves, even built-in airflow operators do not in general
   provide an idempotence guarantee.  It really depends how they are used.
  - For example, a _SqlOperator has a `sql` field.  Whether the task is
  idempotent or not depends entirely on the `sql` that is passed to the
  operator.
   - With non-built-in operators, all bets are off.
   - Variables and XCom can be used to persist and transmit state.
   - People use catchup=False to facilitate non-idempotent tasks.
   - As a scheduler, it can execute arbitrary processes.

These facts mean that as a cluster maintainer, if there is an idempotence
guarantee, it's because the team enforces it -- not because airflow
guarantees it.  Airflow is simply unable to enforce this by itself.

@Jarek & Fokko
RE: you are using the wrong tool for this job, airflow is not for
streaming, you should look at apache beam, read "beyond the horizon"

Incremental processes are not necessarily streaming processes.  The
incremental scenarios I outlined are still batch processes -- hourly, every
4 hours, daily, that kind of thing.  I am just determining the intervals in
a different way from an execution_date-driven process.
Should I use another tool?  Fundamentally, what we're talking about is the
ability to store a small bit of data associated with a task+dag.  When
I consider this gap in the context of everything airflow provides, it's
insignificant by comparison, and there are ways I can solve it:

   - xcom (notwithstanding edge case problems e.g. getting cleared)
   - variables (notwithstanding messiness)
   - roll your own model (notwithstanding effort and complexity on my
   system)
  - incidentally, because of airflow's plugin system, if you do roll
  your own, you can even expose your in-house model through an
admin view in
  airflow!

Nonetheless, I don't think we should *make* users solve it.  And I don't
think we need beam or spark e.g. to schedule sql scripts, merely because
they are incremental instead of idempotent.

@Jarek
RE: adding support for state adds more work for committers
Maybe any feature that is a new thing can create more work.  But there is
also work it may eliminate.  By providing explicit support for this design
pattern, users would not have to force it into XCom or Variable.  We could
have a docs page "howto/incremental_process".  This could reduce questions
on "how can i do an incremental process?".  By *not* providing support, the
burden is shifted to users, who will find a way, but at some cost.
As you have said, we could tell users this is not supported, but why turn
someone away when the platform can actually handle that pattern just fine?

@Fokko
RE:

> >  This value needs to be stored somewhere.
> This data is already available, but would you want to duplicate this in
> Airflow for optimization, my answer would be: no.


Let me try to clarify.  The context here was a dim / fact / reporting
process that is downstream of several tables with varying delivery
cadences.
When incremental load begins, you need to record the state of the source
tables.  Storing min of max modified dates from source tables allows us to
know what we need 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-11 Thread Jarek Potiuk
Daniel:

On Sat, Jan 11, 2020 at 11:10 PM Daniel Standish 
wrote:

> To banish anything stateful seems arbitrary and unnecessary.  Airflow is
> more than its canonical task structure: hook / operator framework and
> ecosystem, scheduling, retry, alerting, distributed computing, etc etc etc
> etc.
>

I think we should be really conscious and deliberately decide what Airflow
does and what it does not.
It's a glorified CRON with fixed intervals to process the data. That's
about it.
I think we should not turn it into a generic DAG executor to handle more
cases. There are plenty of
more or less generic DAG workflow execution engines and our goal is not to
do a generic DAG
workflow engine and replace them. For me it is really basic assumption -
and to change it, it would
require to change completely the direction of the project. This assumption
is pretty much foundational
for Airflow. It's the kind of base that we should look back at and ask
"does the change fit
that basic assumption?" whenever we make any serious decision. I really
like the idea of doing
one thing very well and I think Airflow is the kind of tool. IMHO - we
should not make it easier to use it for
cases it was not designed for even if we can.

As long as support for the canonical task is preserved, what's the harm in
> supporting stateful usage where it makes sense?
>

The harm is that we will have to implement it, answer questions and support
forever all the use cases people might
come up with for such a "state". People are creative and once they have
such a generic feature in their hands
they will use it for various things. By being opinionated, we won't handle
all such cases - and we can simply answer
people who want to (ab)use it - "it's not the intention of Airflow". Of
course we risk that Airflow will not be used for
those people in those cases ... But I think this is exactly what we want in
fact. I'd love people use Apache Beam
for streaming and incremental processing/streaming. It's a fantastic tool
for that.


> Airflow may not have been designed initially to support incremental
> processes.  But it is a living thing, and as it happens, it can work well
> for them.
>

I think it's the case about "if you have a hammer everything looks like a
nail". The fact that it can, it does not mean it's
the best tool for that or that you use it properly.


> I think the two approaches can coexist harmoniously.
>

I don't think so. By adding state you lose the idempotency property - which
is again - foundational assumption for
all operators. We wrote 100s of operators so far and Idempotency was often
the difficult part. This means that you
had to work a bit harder to have a good, idempotent operator. But by doing
so, your users can simply rely on the DAG.
At any point in time they can backfill the DAG from a month ago for a given
day and they do not have to worry about it.
This is THE most important feature of Airflow I think. You can have 100s or
1000s of DAGs in your company
and have one person operate all of them. DAGs written by 10s of other
people. As an operator - you do not have
to know any details about how each operator and DAG works - what you know
that you can re-run/backfill any
portion of DAG from the past and that it will work. When you know you have
to fix some portion of data and you
fixed the algorithms or reference data or cleanup process - you do not have
to understand how it all works.
You simply back-fill. By adding "maybes" to the whole picture (this is what
stateful tasks are about in
this context - it "may" work when back-fill but not necessarily) we are
undermining
the basic trust the operator might have with backfilling tasks. Of course
it's a bit of an oversimplification,
but It reflects the most important (for me) usage and reason why people
would like to use Airflow.
I think it is really important to not have "maybes" here and be opinionated
- this leads to trust in Airflow.

J.



-- 

Jarek Potiuk
Polidea  | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] 


Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-11 Thread Jarek Potiuk
On Sun, Jan 12, 2020 at 3:59 AM Jacob Ferriero 
wrote:

> I am also still in the "solve this in XCom" camp, but agree now that prefix
> is an awkward solution and could be difficult / confusing to users.
>

While I understand the reasons, I am really with Kaxil here on not abusing
XCom for that.
I still think of the reschedule story as not something user facing. I think
that user in their Dag
should not be able to interact with the state stored for rescheduling
(following the whole
discussion about being state-less/ful. I am strongly now in the "no state
for the users" camp.
I think we do not really want our users to shoot themselves in the
foot even if they really want
and Airflow being opinionated about it is a GoodThing(TM). Of course
whatever we come up with
you would be able to run a DB query and read the value if you really,
really want but XCom is an
official way to access state and it's a bit to easy this way. I think it
should be an implementation detail
that users should not rely nor use when writing their dags
(just use it as intended in PokeReschedule way).

The two compelling reasons  I have for keeping this in XCom are:
> 1) I think a new table for task state would be nearly identical to XCom
>

Depends. If you want to add just a field I think we have better options.


> 2) (bare with me here as this is a bit long and I am going to use the word
> "task" informally denoted by quotes)
>
> *Proposed Changes to XCom*- Add a state column (boolen or nullable string)
> - If state column is false /null then clear XCom at beginning of task
> (maintain current behavior)
>

I think if even decide to use XCom (see below) we should really avoid such
general "state"
naming. It should be feature dedicated to reschedule and any field name
should reflect that this
is its purpose and the only purpose. Generic name like "state" calls for
abuse and introduction
of real "state" which we want to avoid.


> Another interesting question is should the rescheduled task instances
> (pokes) be allowed to mutate this state? Not useful for polling for job
> completion but might be useful for other kinds of rescheduled operations
> after an initialized state.
>

Good question. I think that should be allowed to mutate it. Such id could
theoretically be changing over time
for example it could contain some "last check timestamp" or SHA or UUID of
last request or similar. It can be
helpful to optimise routes/checks for the service we interact with. Not
very likely but I think there is no harm in
being able to mutate it.

Jarek,
> As far as adding a state column to the TaskReschedule table, my
> understanding is this table keeps track of task reschedules: it's first
> entry is the first reschedule (not the first / originally scheduled task
> instance) would the state value be duplicated for each reschedule or always
> read from the first reschedule for a given task / dag / execution date?
> Having the state associated w/ reschedules but not the original task
> instance seems confusing to me (though technically possible to work with).
> I suppose I'm just reiterating what Fokko said about XCom granularity
> seeming more appropriate.
>

Agree. It's not best with TaskReschedule. I looked at the code and you are
completely right about it.

However I think we have one far better place for such new column:
TaskInstance.
It seems perfect - has the right primary key, there is only one per task
instance, we already access it while rescheduling.
How about adding new "poke_reschedule_id" column or similar to TaskInstance
?

J.


> --
>
Jarek Potiuk
Polidea  | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] 


Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-11 Thread Driesprong, Fokko
Kaxil,

I would not use a prefix, but a reserved key to have a single state field
for the (dag, task, execution_date) tuple. When fetching the xcom values,
we have to exclude the task that fetches these values.

Daniel,

Thanks for the scenario's.

The first scenario could become idempotent, if you fetch everything up to
the execution time. Also, if you have a modern data platform, fetching the
watermark can be constant. I've also mentioned this on the Slack, but you
can:
- Keep statistics of the column in Hive
- Fetch the max from the footers in the Parquet file, so you don't need to
actual data
- More recent formats, like Iceberg and Delta lake have the min/max of the
column available as well, and this operation is even constant.

>  This value needs to be stored somewhere.

This data is already available, but would you want to duplicate this in
Airflow for optimization, my answer would be: no.

Jarek,

> I believe the whole idea of Airflow is to operate on fixed time intervals.
> We always have fixed intervals and if we re-run an interval processing
it's
> always "all-or-nothing" for that interval. I.e. we do not store or care
for
> watermark. If we decide to re-process an interval of data, we always do it
> for all the data for that interval at the source -> replacing the whole
> "interval-related" data in the output. We are never supposed to process
> incremental data. This is a very basic and fundamental assumption of
> Airflow - that it operates on fixed ("batch") intervals. If you want to
> process "streaming" data where you care for watermarks and "incremental"
> processing you should use other systems - Apache Beam might be a good
> choice for that for example.

This is similar to how I see Airflow. For example reading from S3 using a
templated path s3a://bucket/table/dt=2019-01-10/, and this replaces a
single day partition in your favorite database.

Daniel,

> To banish anything stateful seems arbitrary and unnecessary.  Airflow is
> more than its canonical task structure: hook / operator framework and
> ecosystem, scheduling, retry, alerting, distributed computing, etc etc etc
> etc.

I think this paragraph is trying to say that:
https://github.com/apache/airflow#beyond-the-horizon
Beside that, and I've already mentioned this earlier. I think there is
place for state within Airflow, however the scenario's that you describe
can also be done without keeping state. Of course you will sacrifice a bit
of performance here. Having state makes things more complicated, and should
only be used when there are no other options.

Keeping something like a Job ID of your {Athena,BigQuery) job that you're
tracking between async polls make sense to me. But something like the
watermarks would not be my choice. Also, for committers we need to make
sure, when there are stateful operators, we do the evolution of the state
properly. In case you update your operator in Airflow, and you hit some old
state that's written by a couple of versions back, it should still work.

Cheers, Fokko

Op za 11 jan. 2020 om 23:10 schreef Daniel Standish :

> To banish anything stateful seems arbitrary and unnecessary.  Airflow is
> more than its canonical task structure: hook / operator framework and
> ecosystem, scheduling, retry, alerting, distributed computing, etc etc etc
> etc.
>
> As long as support for the canonical task is preserved, what's the harm in
> supporting stateful usage where it makes sense?
>
> Airflow may not have been designed initially to support incremental
> processes.  But it is a living thing, and as it happens, it can work well
> for them.
>
> I think the two approaches can coexist harmoniously.
>
>
>
>
> On Sat, Jan 11, 2020 at 1:33 PM Jarek Potiuk 
> wrote:
>
> > Pandora's box it is indeed :)
> >
> > @Maxime Beauchemin   -> maybe you could
> chime
> > in here. I think of you still as the gatekeeper (or at least a Yoda
> master)
> > of the very basic ideas behind Apache Airflow, and I think your insight
> > here would be really valuable.
> >
> > >
> > > *Scenario 1: incremental data pull*
> > > If you are incrementally pulling data from a database.  Each time you
> > only
> > > want to pull the records that are modified.  So you want to keep track
> of
> > > `datetime_modified` column.
> > > Each run, you check the max modified date in source and store it.  This
> > is
> > > your "high watermark" for this run.  Next run, you pull from last high
> > > watermark.
> > > In a sense you can't really design this process to be idempotent: if
> you
> > > rerun the interval ('2019-12-01T10:00:00', '2019-12-01T11:00:00') you
> > might
> > > not get the same data (or any data at all) because in the source, those
> >
> > records may have been updated (with new modified date).
> > >
> >
> > I believe the whole idea of Airflow is to operate on fixed time
> intervals.
> > We always have fixed intervals and if we re-run an interval processing
> it's
> > always "all-or-nothing" for that interval. I.e. we do not store or 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-11 Thread Daniel Standish
To banish anything stateful seems arbitrary and unnecessary.  Airflow is
more than its canonical task structure: hook / operator framework and
ecosystem, scheduling, retry, alerting, distributed computing, etc etc etc
etc.

As long as support for the canonical task is preserved, what's the harm in
supporting stateful usage where it makes sense?

Airflow may not have been designed initially to support incremental
processes.  But it is a living thing, and as it happens, it can work well
for them.

I think the two approaches can coexist harmoniously.




On Sat, Jan 11, 2020 at 1:33 PM Jarek Potiuk 
wrote:

> Pandora's box it is indeed :)
>
> @Maxime Beauchemin   -> maybe you could chime
> in here. I think of you still as the gatekeeper (or at least a Yoda master)
> of the very basic ideas behind Apache Airflow, and I think your insight
> here would be really valuable.
>
> >
> > *Scenario 1: incremental data pull*
> > If you are incrementally pulling data from a database.  Each time you
> only
> > want to pull the records that are modified.  So you want to keep track of
> > `datetime_modified` column.
> > Each run, you check the max modified date in source and store it.  This
> is
> > your "high watermark" for this run.  Next run, you pull from last high
> > watermark.
> > In a sense you can't really design this process to be idempotent: if you
> > rerun the interval ('2019-12-01T10:00:00', '2019-12-01T11:00:00') you
> might
> > not get the same data (or any data at all) because in the source, those
>
> records may have been updated (with new modified date).
> >
>
> I believe the whole idea of Airflow is to operate on fixed time intervals.
> We always have fixed intervals and if we re-run an interval processing it's
> always "all-or-nothing" for that interval. I.e. we do not store or care for
> watermark. If we decide to re-process an interval of data, we always do it
> for all the data for that interval at the source -> replacing the whole
> "interval-related" data in the output. We are never supposed to process
> incremental data. This is a very basic and fundamental assumption of
> Airflow - that it operates on fixed ("batch") intervals. If you want to
> process "streaming" data where you care for watermarks and "incremental"
> processing you should use other systems - Apache Beam might be a good
> choice for that for example.
>
>
> > *Scenario 2: incremental dim / fact / reporting processes in database*
> > Suppose I am building a fact table.  it has 10 source tables.   I need to
> > make this run incrementally.  It's possible that there may be differences
> > in update cadence in source tables.  One way to approach this is in each
> > run you calculate max modified in each source table, and take the min of
> > all of them.  That's your high watermark for this run.  Next time, you
> have
> > to process data from there.  This value needs to be stored somewhere.
> >
>
> Again - if you are not able to split the data into fixed intervals, and
> cannot afford re-processing of the whole interval of data rather than
> incremental processing, you should look for another solution. Airflow is
> not designed (and I think it should never do it) for streaming/incremental
> data processing. It is designed to handle fixed-time batches of data.
> Airflow is not about optimising and processing as little data as possible.
> It's all about processing fixed intervals fully so that the processing can
> be as simple as possible - at the expense of sometimes processing the same
> data again-and-again.
>
> *Scenario 3: dynamic date range / resuming large "initial load" processes*
> > Pulling data from APIs, often we might want it to run daily or hourly.
> > Using backfill functionality is sometimes prohibitively slow because you
> > have to carve up years of data into hourly or daily chunks.  One approach
> > is make a temporary `backfill_` job with modified schedule (e.g. monthly)
> > and let that run from beginning of time (with catchup=True).
> > Alternatively you could instead design in stateful way.  On initial run,
> > pull from beginning of time.  Thereafter, pull from last run time (and
> > maybe you need to do a lookback of 45 days or something, because data in
> > source can change).  Perhaps in your initial load you don't want to pull
> by
> > day (too slow) but you also don't want to pull in one batch -- so you
> carve
> > up batches that are appropriate to the situation.  And this is where it's
> > helpful to have a state persistence mechanism: you can use this to store
> > your progress on initial load, and in the event of failure, resume from
> > point of failure.  Yes you _could_ parse it from s3 or wherever, but
> doing
> > so presents its own challenges and risks, and it is convenient to just
> > store it in the database -- and not necessarily any more problematic.
> >
>
> Same here - if your data source is not providing data in fixed intervals, I
> think Apache Airflow might not be the best choice.
>
>
> >
> 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-11 Thread Jarek Potiuk
Pandora's box it is indeed :)

@Maxime Beauchemin   -> maybe you could chime
in here. I think of you still as the gatekeeper (or at least a Yoda master)
of the very basic ideas behind Apache Airflow, and I think your insight
here would be really valuable.

>
> *Scenario 1: incremental data pull*
> If you are incrementally pulling data from a database.  Each time you only
> want to pull the records that are modified.  So you want to keep track of
> `datetime_modified` column.
> Each run, you check the max modified date in source and store it.  This is
> your "high watermark" for this run.  Next run, you pull from last high
> watermark.
> In a sense you can't really design this process to be idempotent: if you
> rerun the interval ('2019-12-01T10:00:00', '2019-12-01T11:00:00') you might
> not get the same data (or any data at all) because in the source, those

records may have been updated (with new modified date).
>

I believe the whole idea of Airflow is to operate on fixed time intervals.
We always have fixed intervals and if we re-run an interval processing it's
always "all-or-nothing" for that interval. I.e. we do not store or care for
watermark. If we decide to re-process an interval of data, we always do it
for all the data for that interval at the source -> replacing the whole
"interval-related" data in the output. We are never supposed to process
incremental data. This is a very basic and fundamental assumption of
Airflow - that it operates on fixed ("batch") intervals. If you want to
process "streaming" data where you care for watermarks and "incremental"
processing you should use other systems - Apache Beam might be a good
choice for that for example.


> *Scenario 2: incremental dim / fact / reporting processes in database*
> Suppose I am building a fact table.  it has 10 source tables.   I need to
> make this run incrementally.  It's possible that there may be differences
> in update cadence in source tables.  One way to approach this is in each
> run you calculate max modified in each source table, and take the min of
> all of them.  That's your high watermark for this run.  Next time, you have
> to process data from there.  This value needs to be stored somewhere.
>

Again - if you are not able to split the data into fixed intervals, and
cannot afford re-processing of the whole interval of data rather than
incremental processing, you should look for another solution. Airflow is
not designed (and I think it should never do it) for streaming/incremental
data processing. It is designed to handle fixed-time batches of data.
Airflow is not about optimising and processing as little data as possible.
It's all about processing fixed intervals fully so that the processing can
be as simple as possible - at the expense of sometimes processing the same
data again-and-again.

*Scenario 3: dynamic date range / resuming large "initial load" processes*
> Pulling data from APIs, often we might want it to run daily or hourly.
> Using backfill functionality is sometimes prohibitively slow because you
> have to carve up years of data into hourly or daily chunks.  One approach
> is make a temporary `backfill_` job with modified schedule (e.g. monthly)
> and let that run from beginning of time (with catchup=True).
> Alternatively you could instead design in stateful way.  On initial run,
> pull from beginning of time.  Thereafter, pull from last run time (and
> maybe you need to do a lookback of 45 days or something, because data in
> source can change).  Perhaps in your initial load you don't want to pull by
> day (too slow) but you also don't want to pull in one batch -- so you carve
> up batches that are appropriate to the situation.  And this is where it's
> helpful to have a state persistence mechanism: you can use this to store
> your progress on initial load, and in the event of failure, resume from
> point of failure.  Yes you _could_ parse it from s3 or wherever, but doing
> so presents its own challenges and risks, and it is convenient to just
> store it in the database -- and not necessarily any more problematic.
>

Same here - if your data source is not providing data in fixed intervals, I
think Apache Airflow might not be the best choice.


>
> *Scenario 4: no access*
> As pointed out earlier, sometimes you might not have access to target.
> E.g. i am pushing to someone elses s3 bucket and we only have PutObject but
> can't read what's there.  So we can't look at target to infer state.
>
> I'm sure there are other use cases out there.  Anything "incremental"
> implies a state.
>

That's the point I think that there might be a problem. Airflow is not
designed to support incremental source of data. And trying to convert
Airflow into such use case is probably not a good idea. Maybe it's just the
same as trying to use an axe to hammer a nail. It will work sometimes, but
maybe it's better to use a hammer instead.

J.


Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-11 Thread Daniel Standish
@Kaxil

> If you can give us more scenarios where
> you think stateful set can solve the issue, please let us know. Also, why
> do you think Variables are not the correct solution for it?



*Scenario 1: incremental data pull*
If you are incrementally pulling data from a database.  Each time you only
want to pull the records that are modified.  So you want to keep track of
`datetime_modified` column.
Each run, you check the max modified date in source and store it.  This is
your "high watermark" for this run.  Next run, you pull from last high
watermark.
In a sense you can't really design this process to be idempotent: if you
rerun the interval ('2019-12-01T10:00:00', '2019-12-01T11:00:00') you might
not get the same data (or any data at all) because in the source, those
records may have been updated (with new modified date).

*Scenario 2: incremental dim / fact / reporting processes in database*
Suppose I am building a fact table.  it has 10 source tables.   I need to
make this run incrementally.  It's possible that there may be differences
in update cadence in source tables.  One way to approach this is in each
run you calculate max modified in each source table, and take the min of
all of them.  That's your high watermark for this run.  Next time, you have
to process data from there.  This value needs to be stored somewhere.


*Scenario 3: dynamic date range / resuming large "initial load" processes*
Pulling data from APIs, often we might want it to run daily or hourly.
Using backfill functionality is sometimes prohibitively slow because you
have to carve up years of data into hourly or daily chunks.  One approach
is make a temporary `backfill_` job with modified schedule (e.g. monthly)
and let that run from beginning of time (with catchup=True).
Alternatively you could instead design in stateful way.  On initial run,
pull from beginning of time.  Thereafter, pull from last run time (and
maybe you need to do a lookback of 45 days or something, because data in
source can change).  Perhaps in your initial load you don't want to pull by
day (too slow) but you also don't want to pull in one batch -- so you carve
up batches that are appropriate to the situation.  And this is where it's
helpful to have a state persistence mechanism: you can use this to store
your progress on initial load, and in the event of failure, resume from
point of failure.  Yes you _could_ parse it from s3 or wherever, but doing
so presents its own challenges and risks, and it is convenient to just
store it in the database -- and not necessarily any more problematic.

*Scenario 4: no access*
As pointed out earlier, sometimes you might not have access to target.
E.g. i am pushing to someone elses s3 bucket and we only have PutObject but
can't read what's there.  So we can't look at target to infer state.

I'm sure there are other use cases out there.  Anything "incremental"
implies a state.

*Why not variables?  *
For sure they can be used for this kind of thing.   But a tad messy.   I
think if variables had a `namespace` column added to the primary key, that
would help.

I have experimented with two different alternative approaches at my
organization: ProcessState and TaskState.  And so far I like both of them.

Note: I am not suggesting these tables should be added to airflow -- I am
just sharing, in response to the question why not variables, and in the
context of thinking about how state can be handled more generally.

*TaskState* has primary key of dag_id + task_id with a `value` column that
is json data.  Users can persist arbitrary state there for their task.  It
is essentially a variable that is tied to a specific task.  The benefit of
this compared to variable is there is no need to think about naming
convention -- the operator can handle that for you because it knows dag id
and task id.   It's not TaskInstanceState; i.e. the scope is the *task*,
and not the task *run*.

I was gonna say that TaskState is equivalent to adding a state_data column
to the `task` table but there is no such table :)

I also created a *ProcessState* table.  ProcessState has a primary key of
namespace + process_name.  Otherwise it is identical to TaskState.  The
cost of using ProcessState is you have to choose a namespace and
process_name.  The reward, however, is that you can now freely move a task
from one dag to another (or rename it) without having to perform any
updates in the metastore.  So if we need to move a process from one
schedule to another, we can.  This you cannot do with XCom or TaskState.

ProcessState essentially is Variable but with an added `namespace` that
helps with organization and naming that indicates its specific purpose.

I like TaskState because you don't have to think about naming.  But for
some processes it's nice to be able change schedule i.e. move to different
dag easily.

When I use ProcessState, the process_name is driven by the target object,
e.g. table name, and the naming is controlled by a helper class.

One 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-11 Thread Kaxil Naik
@Fokko:
If we go ahead with storing this info in Xcom, is your suggestion to use a
fixed Prefix?

Cheers,
Kaxil

On Sat, Jan 11, 2020, 14:50 Driesprong, Fokko  wrote:

> I would still be in favor of pushing this into xcom, however not
> changing the behavior of the current xcom implementation. Xcom is now for
> intra-communication, but it could also be inter-communication, for me it is
> very closely related. So having an additional option to explicitly include
> the state of the current operator. TaskReschedule, as Jarek
> mentioned, would also be an option, but this is on a TaskRun level and I
> believe it should be on a Task level. As mentioned earlier, Variables feels
> very messy to me. This is on a global level, so then you should template
> the dag_id, task_id in there? It would also create a lot of entries in the
> table.
>
> Regarding the FTP issues. Airflow is not going to magically solve your FTP
> connections. If the FTP server is down, then it is okay for the operator to
> fail, and retry somewhere later and then the FTP server is hopefully back
> up. If there are flakey network issues, then you should implement some
> retrying mechanism. We had a similar use case when using HTTP. An Airflow
> user was listing through a paged REST API. If fetching one the pages
> failed, the operator would fail and it had to start all over again. Using
> Tenacity this has been fixed:
>
> https://github.com/apache/airflow/blob/fd78c65cabae2241a4c1d3a792e00620049cbf3e/airflow/hooks/http_hook.py#L186
>
> Ideally, you would like to have a path that contains the day in the FTP
> path, so you know which files to copy for which day, and you can also
> backfill this. This would exclude race conditions since you can do multiple
> days in parallel.
>
> Using xcom this would also be possible. First have an operator that will
> list the files on the FTP site, push this to xcom. Have another operator
> that fetches the files that you already have and push this to xcom as well.
> Using a Python operator you can easily do a diff, and then you know which
> files to download. In this case, you should limit the active dag-runs to
> one, to avoid race conditions.
>
> I believe that for many use cases you don't need to keep state in Airflow,
> and this might be convenient, but it is just shifting the issue. If you can
> fetch the issue from somewhere external, and this is the one and single
> truth, then this should be the preferred solution.
>
> Cheers, Fokko
>
> Op za 11 jan. 2020 om 04:21 schreef Kaxil Naik :
>
> > Hey all,
> >
> > Really good document Jacob.
> >
> > Below are my thoughts on different topics discussed in the docs and the
> > mailing list:
> >
> >
> > *Prefix on Xcom*
> > I don't think that is a good idea to mix this into Xcom. We should let
> Xcom
> > be used for exactly one purpose.
> >
> > *Storing state in Xcom between Retries*
> > This is definitely going to break idempotency. When the default retries
> are
> > enabled this is going to create undesired effects.
> >
> > @Daniel Standish : I would like to more understand the needs of Stateful
> > sets for sure. If you can give us more scenarios where
> > you think stateful set can solve the issue, please let us know. Also, why
> > do you think Variables are not the correct solution for it?
> >
> > I would imagine your custom operator can store some state in Variables.
> For
> > example, you can store a json containing the following in
> > Airflow Variables:
> >
> >- all_files
> >- files_copied
> >
> > The variable, in this case, would have the details it needs to resume the
> > copying from where it stopped. You custom operator as a first
> > the step should check the Variable (with deserialized JSON).
> >
> > *The new structure for storing Stateful structure for reschedules*
> > This can be a new table that has a relationship with the TI table or
> just a
> > new column and this column can be loaded only when using
> > Reschedule/async operators or sensors.
> >
> > Regards,
> > Kaxil
> >
> > On Fri, Jan 10, 2020 at 11:45 PM Yingbo Wang  wrote:
> >
> > > The updated AIP with smart sensor design and some implementation is in
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-17+Airflow+sensor+optimization
> > > Open source PR https://github.com/apache/airflow/pull/5499
> > >
> > >
> > > On Fri, Jan 10, 2020 at 1:44 PM Alex Guziel  > > .invalid>
> > > wrote:
> > >
> > > > I feel like for this, we can incorporate the smart sensor we have
> > > > implemented at Airbnb that we plan on open sourcing.
> > > >
> > > > The TL;DR is that it works by having the Sensor task run briefly and
> > > > materialize some state into the DB which master sensor tasks poke
> for.
> > > This
> > > > can be with custom time intervals.
> > > >
> > > > On Fri, Jan 10, 2020 at 1:42 PM Daniel Standish <
> dpstand...@gmail.com>
> > > > wrote:
> > > >
> > > > > I also am a big fan of adding better support for stateful tasks,
> > > though I
> > > > > know 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-11 Thread Driesprong, Fokko
I would still be in favor of pushing this into xcom, however not
changing the behavior of the current xcom implementation. Xcom is now for
intra-communication, but it could also be inter-communication, for me it is
very closely related. So having an additional option to explicitly include
the state of the current operator. TaskReschedule, as Jarek
mentioned, would also be an option, but this is on a TaskRun level and I
believe it should be on a Task level. As mentioned earlier, Variables feels
very messy to me. This is on a global level, so then you should template
the dag_id, task_id in there? It would also create a lot of entries in the
table.

Regarding the FTP issues. Airflow is not going to magically solve your FTP
connections. If the FTP server is down, then it is okay for the operator to
fail, and retry somewhere later and then the FTP server is hopefully back
up. If there are flakey network issues, then you should implement some
retrying mechanism. We had a similar use case when using HTTP. An Airflow
user was listing through a paged REST API. If fetching one the pages
failed, the operator would fail and it had to start all over again. Using
Tenacity this has been fixed:
https://github.com/apache/airflow/blob/fd78c65cabae2241a4c1d3a792e00620049cbf3e/airflow/hooks/http_hook.py#L186

Ideally, you would like to have a path that contains the day in the FTP
path, so you know which files to copy for which day, and you can also
backfill this. This would exclude race conditions since you can do multiple
days in parallel.

Using xcom this would also be possible. First have an operator that will
list the files on the FTP site, push this to xcom. Have another operator
that fetches the files that you already have and push this to xcom as well.
Using a Python operator you can easily do a diff, and then you know which
files to download. In this case, you should limit the active dag-runs to
one, to avoid race conditions.

I believe that for many use cases you don't need to keep state in Airflow,
and this might be convenient, but it is just shifting the issue. If you can
fetch the issue from somewhere external, and this is the one and single
truth, then this should be the preferred solution.

Cheers, Fokko

Op za 11 jan. 2020 om 04:21 schreef Kaxil Naik :

> Hey all,
>
> Really good document Jacob.
>
> Below are my thoughts on different topics discussed in the docs and the
> mailing list:
>
>
> *Prefix on Xcom*
> I don't think that is a good idea to mix this into Xcom. We should let Xcom
> be used for exactly one purpose.
>
> *Storing state in Xcom between Retries*
> This is definitely going to break idempotency. When the default retries are
> enabled this is going to create undesired effects.
>
> @Daniel Standish : I would like to more understand the needs of Stateful
> sets for sure. If you can give us more scenarios where
> you think stateful set can solve the issue, please let us know. Also, why
> do you think Variables are not the correct solution for it?
>
> I would imagine your custom operator can store some state in Variables. For
> example, you can store a json containing the following in
> Airflow Variables:
>
>- all_files
>- files_copied
>
> The variable, in this case, would have the details it needs to resume the
> copying from where it stopped. You custom operator as a first
> the step should check the Variable (with deserialized JSON).
>
> *The new structure for storing Stateful structure for reschedules*
> This can be a new table that has a relationship with the TI table or just a
> new column and this column can be loaded only when using
> Reschedule/async operators or sensors.
>
> Regards,
> Kaxil
>
> On Fri, Jan 10, 2020 at 11:45 PM Yingbo Wang  wrote:
>
> > The updated AIP with smart sensor design and some implementation is in
> >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-17+Airflow+sensor+optimization
> > Open source PR https://github.com/apache/airflow/pull/5499
> >
> >
> > On Fri, Jan 10, 2020 at 1:44 PM Alex Guziel  > .invalid>
> > wrote:
> >
> > > I feel like for this, we can incorporate the smart sensor we have
> > > implemented at Airbnb that we plan on open sourcing.
> > >
> > > The TL;DR is that it works by having the Sensor task run briefly and
> > > materialize some state into the DB which master sensor tasks poke for.
> > This
> > > can be with custom time intervals.
> > >
> > > On Fri, Jan 10, 2020 at 1:42 PM Daniel Standish 
> > > wrote:
> > >
> > > > I also am a big fan of adding better support for stateful tasks,
> > though I
> > > > know this is a thorny subject in airflow community.
> > > >
> > > > There are many data warehousing tasks where state makes a lot of
> sense.
> > > > While idempotence is a nice design pattern it's not the solution for
> > > every
> > > > problem.
> > > >
> > > > XCom may not be the way, but there should be a way.  Variables work,
> > but
> > > to
> > > > me it makes sense to have a separate structure that is associated
> 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-10 Thread Kaxil Naik
Hey all,

Really good document Jacob.

Below are my thoughts on different topics discussed in the docs and the
mailing list:


*Prefix on Xcom*
I don't think that is a good idea to mix this into Xcom. We should let Xcom
be used for exactly one purpose.

*Storing state in Xcom between Retries*
This is definitely going to break idempotency. When the default retries are
enabled this is going to create undesired effects.

@Daniel Standish : I would like to more understand the needs of Stateful
sets for sure. If you can give us more scenarios where
you think stateful set can solve the issue, please let us know. Also, why
do you think Variables are not the correct solution for it?

I would imagine your custom operator can store some state in Variables. For
example, you can store a json containing the following in
Airflow Variables:

   - all_files
   - files_copied

The variable, in this case, would have the details it needs to resume the
copying from where it stopped. You custom operator as a first
the step should check the Variable (with deserialized JSON).

*The new structure for storing Stateful structure for reschedules*
This can be a new table that has a relationship with the TI table or just a
new column and this column can be loaded only when using
Reschedule/async operators or sensors.

Regards,
Kaxil

On Fri, Jan 10, 2020 at 11:45 PM Yingbo Wang  wrote:

> The updated AIP with smart sensor design and some implementation is in
>
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-17+Airflow+sensor+optimization
> Open source PR https://github.com/apache/airflow/pull/5499
>
>
> On Fri, Jan 10, 2020 at 1:44 PM Alex Guziel  .invalid>
> wrote:
>
> > I feel like for this, we can incorporate the smart sensor we have
> > implemented at Airbnb that we plan on open sourcing.
> >
> > The TL;DR is that it works by having the Sensor task run briefly and
> > materialize some state into the DB which master sensor tasks poke for.
> This
> > can be with custom time intervals.
> >
> > On Fri, Jan 10, 2020 at 1:42 PM Daniel Standish 
> > wrote:
> >
> > > I also am a big fan of adding better support for stateful tasks,
> though I
> > > know this is a thorny subject in airflow community.
> > >
> > > There are many data warehousing tasks where state makes a lot of sense.
> > > While idempotence is a nice design pattern it's not the solution for
> > every
> > > problem.
> > >
> > > XCom may not be the way, but there should be a way.  Variables work,
> but
> > to
> > > me it makes sense to have a separate structure that is associated with
> > the
> > > task, or the dag, or the task instance.
> > >
> > >
> > > On Fri, Jan 10, 2020 at 12:36 PM Shaw, Damian P. <
> > > damian.sha...@credit-suisse.com> wrote:
> > >
> > > > FYI the design of the already discussed pull would allow state to be
> > > > persisted across retries:
> > > > https://github.com/apache/airflow/pull/6370#issuecomment-546582724
> > While
> > > > I agree in most cases you are correct I would however be greatly
> > > > appreciated to not explicitly exclude this capability in the design
> of
> > > > keeping state across reschedules.
> > > >
> > > > In lost of cases I already do exactly what you suggest, I scan the
> > state
> > > > of the target system and resume from there. However in lengthy
> > pipelines
> > > > this becomes complex, for example I have a pipeline that goes
> something
> > > > like:   FTP Download -> Decrypt File and Zip File -> Upload to Jump
> > Host
> > > > and remove Zip File -> Store in S3 Bucket.
> > > >
> > > > The data needs to be available at the end state as soon as possible
> so
> > > the
> > > > decryption operator is a sensor that is already running and waits for
> > the
> > > > file to be available and immediately decrypts and zips the file, same
> > for
> > > > the upload operator. From inside the corporate network environment
> it's
> > > not
> > > > possible to check the state of the s3 bucket so the orriginal FTP
> > > Download
> > > > process can not check the state of the final final target system.
> Even
> > if
> > > > it was this could lead to a race condition if the data is in transit.
> > > >
> > > > I guess in environments where you have a lot of control and aren't
> > > > beholden to capracious policy, audit, and regulatory requirements
> such
> > > > scenarios must indeed seem niche :). Anyway we have a soluton, just
> > > asking
> > > > you don't go out of your way to stop users from shooting themselves
> in
> > > the
> > > > foot if they're really determined to.
> > > >
> > > > Damian
> > > >
> > > > -Original Message-
> > > > From: Chris Palmer 
> > > > Sent: Friday, January 10, 2020 13:37
> > > > To: dev@airflow.apache.org
> > > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke
> > > > Rescheduling in Operators
> > > >
> > > > I agree with Jarek that maintaining state between retries is not the
> > > right
> > > > thing to do. To be honest I'm not even convinced by the need for
> 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-10 Thread Yingbo Wang
The updated AIP with smart sensor design and some implementation is in
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-17+Airflow+sensor+optimization
Open source PR https://github.com/apache/airflow/pull/5499


On Fri, Jan 10, 2020 at 1:44 PM Alex Guziel 
wrote:

> I feel like for this, we can incorporate the smart sensor we have
> implemented at Airbnb that we plan on open sourcing.
>
> The TL;DR is that it works by having the Sensor task run briefly and
> materialize some state into the DB which master sensor tasks poke for. This
> can be with custom time intervals.
>
> On Fri, Jan 10, 2020 at 1:42 PM Daniel Standish 
> wrote:
>
> > I also am a big fan of adding better support for stateful tasks, though I
> > know this is a thorny subject in airflow community.
> >
> > There are many data warehousing tasks where state makes a lot of sense.
> > While idempotence is a nice design pattern it's not the solution for
> every
> > problem.
> >
> > XCom may not be the way, but there should be a way.  Variables work, but
> to
> > me it makes sense to have a separate structure that is associated with
> the
> > task, or the dag, or the task instance.
> >
> >
> > On Fri, Jan 10, 2020 at 12:36 PM Shaw, Damian P. <
> > damian.sha...@credit-suisse.com> wrote:
> >
> > > FYI the design of the already discussed pull would allow state to be
> > > persisted across retries:
> > > https://github.com/apache/airflow/pull/6370#issuecomment-546582724
> While
> > > I agree in most cases you are correct I would however be greatly
> > > appreciated to not explicitly exclude this capability in the design of
> > > keeping state across reschedules.
> > >
> > > In lost of cases I already do exactly what you suggest, I scan the
> state
> > > of the target system and resume from there. However in lengthy
> pipelines
> > > this becomes complex, for example I have a pipeline that goes something
> > > like:   FTP Download -> Decrypt File and Zip File -> Upload to Jump
> Host
> > > and remove Zip File -> Store in S3 Bucket.
> > >
> > > The data needs to be available at the end state as soon as possible so
> > the
> > > decryption operator is a sensor that is already running and waits for
> the
> > > file to be available and immediately decrypts and zips the file, same
> for
> > > the upload operator. From inside the corporate network environment it's
> > not
> > > possible to check the state of the s3 bucket so the orriginal FTP
> > Download
> > > process can not check the state of the final final target system. Even
> if
> > > it was this could lead to a race condition if the data is in transit.
> > >
> > > I guess in environments where you have a lot of control and aren't
> > > beholden to capracious policy, audit, and regulatory requirements such
> > > scenarios must indeed seem niche :). Anyway we have a soluton, just
> > asking
> > > you don't go out of your way to stop users from shooting themselves in
> > the
> > > foot if they're really determined to.
> > >
> > > Damian
> > >
> > > -Original Message-
> > > From: Chris Palmer 
> > > Sent: Friday, January 10, 2020 13:37
> > > To: dev@airflow.apache.org
> > > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke
> > > Rescheduling in Operators
> > >
> > > I agree with Jarek that maintaining state between retries is not the
> > right
> > > thing to do. To be honest I'm not even convinced by the need for state
> > > between reschedules myself.
> > >
> > > While I know from past experience that FTP is a pain to deal with, I
> > think
> > > that your example is a pretty niche one. Additionally, when thinking
> > about
> > > idempotent task design, lots of tasks utilize state that exists in
> other
> > > systems. You should be thinking about what state you want some external
> > > system to be in after the task has run, rather than precisely what
> > actions
> > > you want the task to do.
> > >
> > > It's the subtle difference between:
> > >
> > > "When it runs, this task should create the required table in my
> database"
> > > (by running a simple 'CREATE TABLE foobar .')
> > >
> > > and
> > >
> > > "After this tasks has finished, the required table should exist in my
> > > database" (by running 'CREATE TABLE IF NOT EXISTS foobar .')
> > >
> > >
> > > The first will fail if run repeatedly (without someone taking some
> other
> > > action like deleting the table). The second can be run as many times as
> > you
> > > want without error, but it relies on the state that is maintained by
> your
> > > database.
> > >
> > > In your case the external state I think you should care about is the
> file
> > > system you are downloading the files to, as opposed to some external
> > table
> > > that could get out of sync with the file system. So I would write the
> > > operator so that the first thing it does is compare the complete list
> > with
> > > what already exists in the destination, and then only attempt to
> download
> > > the ones that are missing.
> > >
> > 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-10 Thread Alex Guziel
I feel like for this, we can incorporate the smart sensor we have
implemented at Airbnb that we plan on open sourcing.

The TL;DR is that it works by having the Sensor task run briefly and
materialize some state into the DB which master sensor tasks poke for. This
can be with custom time intervals.

On Fri, Jan 10, 2020 at 1:42 PM Daniel Standish 
wrote:

> I also am a big fan of adding better support for stateful tasks, though I
> know this is a thorny subject in airflow community.
>
> There are many data warehousing tasks where state makes a lot of sense.
> While idempotence is a nice design pattern it's not the solution for every
> problem.
>
> XCom may not be the way, but there should be a way.  Variables work, but to
> me it makes sense to have a separate structure that is associated with the
> task, or the dag, or the task instance.
>
>
> On Fri, Jan 10, 2020 at 12:36 PM Shaw, Damian P. <
> damian.sha...@credit-suisse.com> wrote:
>
> > FYI the design of the already discussed pull would allow state to be
> > persisted across retries:
> > https://github.com/apache/airflow/pull/6370#issuecomment-546582724 While
> > I agree in most cases you are correct I would however be greatly
> > appreciated to not explicitly exclude this capability in the design of
> > keeping state across reschedules.
> >
> > In lost of cases I already do exactly what you suggest, I scan the state
> > of the target system and resume from there. However in lengthy pipelines
> > this becomes complex, for example I have a pipeline that goes something
> > like:   FTP Download -> Decrypt File and Zip File -> Upload to Jump Host
> > and remove Zip File -> Store in S3 Bucket.
> >
> > The data needs to be available at the end state as soon as possible so
> the
> > decryption operator is a sensor that is already running and waits for the
> > file to be available and immediately decrypts and zips the file, same for
> > the upload operator. From inside the corporate network environment it's
> not
> > possible to check the state of the s3 bucket so the orriginal FTP
> Download
> > process can not check the state of the final final target system. Even if
> > it was this could lead to a race condition if the data is in transit.
> >
> > I guess in environments where you have a lot of control and aren't
> > beholden to capracious policy, audit, and regulatory requirements such
> > scenarios must indeed seem niche :). Anyway we have a soluton, just
> asking
> > you don't go out of your way to stop users from shooting themselves in
> the
> > foot if they're really determined to.
> >
> > Damian
> >
> > -Original Message-
> > From: Chris Palmer 
> > Sent: Friday, January 10, 2020 13:37
> > To: dev@airflow.apache.org
> > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke
> > Rescheduling in Operators
> >
> > I agree with Jarek that maintaining state between retries is not the
> right
> > thing to do. To be honest I'm not even convinced by the need for state
> > between reschedules myself.
> >
> > While I know from past experience that FTP is a pain to deal with, I
> think
> > that your example is a pretty niche one. Additionally, when thinking
> about
> > idempotent task design, lots of tasks utilize state that exists in other
> > systems. You should be thinking about what state you want some external
> > system to be in after the task has run, rather than precisely what
> actions
> > you want the task to do.
> >
> > It's the subtle difference between:
> >
> > "When it runs, this task should create the required table in my database"
> > (by running a simple 'CREATE TABLE foobar .')
> >
> > and
> >
> > "After this tasks has finished, the required table should exist in my
> > database" (by running 'CREATE TABLE IF NOT EXISTS foobar .')
> >
> >
> > The first will fail if run repeatedly (without someone taking some other
> > action like deleting the table). The second can be run as many times as
> you
> > want without error, but it relies on the state that is maintained by your
> > database.
> >
> > In your case the external state I think you should care about is the file
> > system you are downloading the files to, as opposed to some external
> table
> > that could get out of sync with the file system. So I would write the
> > operator so that the first thing it does is compare the complete list
> with
> > what already exists in the destination, and then only attempt to download
> > the ones that are missing.
> >
> > Chris
> >
> > On Fri, Jan 10, 2020 at 12:52 PM Jarek Potiuk 
> > wrote:
> >
> > > I wonder what others think of it.
> > >
> > > On Fri, Jan 10, 2020 at 6:04 PM Shaw, Damian P. <
> > > damian.sha...@credit-suisse.com> wrote:
> > >
> > > > I don't believe so, the default should be that state isn't preserved
> > > > across retries, just that it's possible for the user to enable it if
> > > > they are willing to take on that complexity.
> > >
> > >
> > > > We have lots of operators that do this already as if they 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-10 Thread Daniel Standish
I also am a big fan of adding better support for stateful tasks, though I
know this is a thorny subject in airflow community.

There are many data warehousing tasks where state makes a lot of sense.
While idempotence is a nice design pattern it's not the solution for every
problem.

XCom may not be the way, but there should be a way.  Variables work, but to
me it makes sense to have a separate structure that is associated with the
task, or the dag, or the task instance.


On Fri, Jan 10, 2020 at 12:36 PM Shaw, Damian P. <
damian.sha...@credit-suisse.com> wrote:

> FYI the design of the already discussed pull would allow state to be
> persisted across retries:
> https://github.com/apache/airflow/pull/6370#issuecomment-546582724 While
> I agree in most cases you are correct I would however be greatly
> appreciated to not explicitly exclude this capability in the design of
> keeping state across reschedules.
>
> In lost of cases I already do exactly what you suggest, I scan the state
> of the target system and resume from there. However in lengthy pipelines
> this becomes complex, for example I have a pipeline that goes something
> like:   FTP Download -> Decrypt File and Zip File -> Upload to Jump Host
> and remove Zip File -> Store in S3 Bucket.
>
> The data needs to be available at the end state as soon as possible so the
> decryption operator is a sensor that is already running and waits for the
> file to be available and immediately decrypts and zips the file, same for
> the upload operator. From inside the corporate network environment it's not
> possible to check the state of the s3 bucket so the orriginal FTP Download
> process can not check the state of the final final target system. Even if
> it was this could lead to a race condition if the data is in transit.
>
> I guess in environments where you have a lot of control and aren't
> beholden to capracious policy, audit, and regulatory requirements such
> scenarios must indeed seem niche :). Anyway we have a soluton, just asking
> you don't go out of your way to stop users from shooting themselves in the
> foot if they're really determined to.
>
> Damian
>
> -Original Message-
> From: Chris Palmer 
> Sent: Friday, January 10, 2020 13:37
> To: dev@airflow.apache.org
> Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke
> Rescheduling in Operators
>
> I agree with Jarek that maintaining state between retries is not the right
> thing to do. To be honest I'm not even convinced by the need for state
> between reschedules myself.
>
> While I know from past experience that FTP is a pain to deal with, I think
> that your example is a pretty niche one. Additionally, when thinking about
> idempotent task design, lots of tasks utilize state that exists in other
> systems. You should be thinking about what state you want some external
> system to be in after the task has run, rather than precisely what actions
> you want the task to do.
>
> It's the subtle difference between:
>
> "When it runs, this task should create the required table in my database"
> (by running a simple 'CREATE TABLE foobar .')
>
> and
>
> "After this tasks has finished, the required table should exist in my
> database" (by running 'CREATE TABLE IF NOT EXISTS foobar .')
>
>
> The first will fail if run repeatedly (without someone taking some other
> action like deleting the table). The second can be run as many times as you
> want without error, but it relies on the state that is maintained by your
> database.
>
> In your case the external state I think you should care about is the file
> system you are downloading the files to, as opposed to some external table
> that could get out of sync with the file system. So I would write the
> operator so that the first thing it does is compare the complete list with
> what already exists in the destination, and then only attempt to download
> the ones that are missing.
>
> Chris
>
> On Fri, Jan 10, 2020 at 12:52 PM Jarek Potiuk 
> wrote:
>
> > I wonder what others think of it.
> >
> > On Fri, Jan 10, 2020 at 6:04 PM Shaw, Damian P. <
> > damian.sha...@credit-suisse.com> wrote:
> >
> > > I don't believe so, the default should be that state isn't preserved
> > > across retries, just that it's possible for the user to enable it if
> > > they are willing to take on that complexity.
> >
> >
> > > We have lots of operators that do this already as if they fail part
> > > way through a job the overhead of resuming from the beginning rather
> > > than having state on their progress is too much, just annoying we
> > > have to keep this state outside Airflow as it requires extra
> > > infrastructure for our
> > task
> > > scheduling.
> > >
> > > For example we have an FTP site that we need to download 250 files
> > > from, the full file list is provided to the operator, the FTP
> > > connection is
> > very
> > > unreliable and the job often fails midway, on retry we don't want to
> > resume
> > > from the beginning of the job 

RE: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-10 Thread Shaw, Damian P.
FYI the design of the already discussed pull would allow state to be persisted 
across retries: 
https://github.com/apache/airflow/pull/6370#issuecomment-546582724 While I 
agree in most cases you are correct I would however be greatly appreciated to 
not explicitly exclude this capability in the design of keeping state across 
reschedules. 

In lost of cases I already do exactly what you suggest, I scan the state of the 
target system and resume from there. However in lengthy pipelines this becomes 
complex, for example I have a pipeline that goes something like:   FTP Download 
-> Decrypt File and Zip File -> Upload to Jump Host and remove Zip File -> 
Store in S3 Bucket.

The data needs to be available at the end state as soon as possible so the 
decryption operator is a sensor that is already running and waits for the file 
to be available and immediately decrypts and zips the file, same for the upload 
operator. From inside the corporate network environment it's not possible to 
check the state of the s3 bucket so the orriginal FTP Download process can not 
check the state of the final final target system. Even if it was this could 
lead to a race condition if the data is in transit.

I guess in environments where you have a lot of control and aren't beholden to 
capracious policy, audit, and regulatory requirements such scenarios must 
indeed seem niche :). Anyway we have a soluton, just asking you don't go out of 
your way to stop users from shooting themselves in the foot if they're really 
determined to.

Damian 

-Original Message-
From: Chris Palmer  
Sent: Friday, January 10, 2020 13:37
To: dev@airflow.apache.org
Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling 
in Operators

I agree with Jarek that maintaining state between retries is not the right 
thing to do. To be honest I'm not even convinced by the need for state between 
reschedules myself.

While I know from past experience that FTP is a pain to deal with, I think that 
your example is a pretty niche one. Additionally, when thinking about 
idempotent task design, lots of tasks utilize state that exists in other 
systems. You should be thinking about what state you want some external system 
to be in after the task has run, rather than precisely what actions you want 
the task to do.

It's the subtle difference between:

"When it runs, this task should create the required table in my database"
(by running a simple 'CREATE TABLE foobar .')

and

"After this tasks has finished, the required table should exist in my database" 
(by running 'CREATE TABLE IF NOT EXISTS foobar .')


The first will fail if run repeatedly (without someone taking some other action 
like deleting the table). The second can be run as many times as you want 
without error, but it relies on the state that is maintained by your database.

In your case the external state I think you should care about is the file 
system you are downloading the files to, as opposed to some external table that 
could get out of sync with the file system. So I would write the operator so 
that the first thing it does is compare the complete list with what already 
exists in the destination, and then only attempt to download the ones that are 
missing.

Chris

On Fri, Jan 10, 2020 at 12:52 PM Jarek Potiuk 
wrote:

> I wonder what others think of it.
>
> On Fri, Jan 10, 2020 at 6:04 PM Shaw, Damian P. < 
> damian.sha...@credit-suisse.com> wrote:
>
> > I don't believe so, the default should be that state isn't preserved 
> > across retries, just that it's possible for the user to enable it if 
> > they are willing to take on that complexity.
>
>
> > We have lots of operators that do this already as if they fail part 
> > way through a job the overhead of resuming from the beginning rather 
> > than having state on their progress is too much, just annoying we 
> > have to keep this state outside Airflow as it requires extra 
> > infrastructure for our
> task
> > scheduling.
> >
> > For example we have an FTP site that we need to download 250 files 
> > from, the full file list is provided to the operator, the FTP 
> > connection is
> very
> > unreliable and the job often fails midway, on retry we don't want to
> resume
> > from the beginning of the job so we store the state of our progress 
> > in a table outside Airflow. We can't split the job in to 250 tasks 
> > because the FTP site only accepts 1 connection at a time so the 
> > overhead of 250
> logins
> > would add an hour to the process and it would make the Airflow UI 
> > near unusable.
> >
>
> I do not know all the details of course - but your case seems to be 
> solvable much easier and in "Airflow" way. You can have custom 
> operator that continues running until everything is downloaded and 
> retries failed transfer. The state of which file is downloaded should 
> be kept in memory and even if FTP operation fails, it should retry 
> each failed file rather than fail the whole 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-10 Thread Anton Zayniev
I love the idea of rescheduling, but I'd say using xcom for it is a bad
idea. It is already quite confusing concept (mostly due to lack of
documentation). Making it stateful depending on context makes it even more
frightening. For me it sounds like function that can change it's signature
-- occasionally useful, usually dangerous.
Anton


On Fri, Jan 10, 2020, 20:52 Jarek Potiuk  wrote:

> I wonder what others think of it.
>
> On Fri, Jan 10, 2020 at 6:04 PM Shaw, Damian P. <
> damian.sha...@credit-suisse.com> wrote:
>
> > I don't believe so, the default should be that state isn't preserved
> > across retries, just that it's possible for the user to enable it if they
> > are willing to take on that complexity.
>
>
> > We have lots of operators that do this already as if they fail part way
> > through a job the overhead of resuming from the beginning rather than
> > having state on their progress is too much, just annoying we have to keep
> > this state outside Airflow as it requires extra infrastructure for our
> task
> > scheduling.
> >
> > For example we have an FTP site that we need to download 250 files from,
> > the full file list is provided to the operator, the FTP connection is
> very
> > unreliable and the job often fails midway, on retry we don't want to
> resume
> > from the beginning of the job so we store the state of our progress in a
> > table outside Airflow. We can't split the job in to 250 tasks because the
> > FTP site only accepts 1 connection at a time so the overhead of 250
> logins
> > would add an hour to the process and it would make the Airflow UI near
> > unusable.
> >
>
> I do not know all the details of course - but your case seems to be
> solvable much easier and in "Airflow" way. You can have custom operator
> that continues running until everything is downloaded and retries failed
> transfer. The state of which file is downloaded should be kept in memory
> and even if FTP operation fails, it should retry each failed file rather
> than fail the whole operator.  That would keep it idempotent, and keep the
> state in memory rather than in Airflow's DB or in external system. Even if
> you already have an operator that transfers X files already and you do not
> want to change it, you can likely wrap it/extend to keep list of files in
> memory and retry only those files that failed so far. IMHO In your solution
> you do exactly what you are not supposed to according to Airflow's design -
> unless you do some extra logic and complexity your operator is not
> idempotent.
>
> For example - If you delete downloaded files for whatever reason and keep
> the external state and run backfill, I believe what will happen (unless you
> have some extra logic) it will see (from external state) that the files
> were already downloaded and will not download them again. If you use the
> in-memory state, it will work as expected - next time you run it via
> back-fill,  it will re-download all files.
>
> J.
>
>
> > Damian
> >
> > -Original Message-
> > From: Jarek Potiuk 
> > Sent: Friday, January 10, 2020 11:45
> > To: dev@airflow.apache.org
> > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke
> > Rescheduling in Operators
> >
> > Just opening the Pandora's box :).
> >
> > I personally think we should not keep state between retries. It opens up
> > possibilities to whole host of problems and edge cases, and allows people
> > to solve some approaches in non-airflow'y ways - losing some important
> > properties (mainly idempotency). Tasks in Airflow should be idempotent
> and
> > stateless from the operator's author point of view).
> >
> > I think there is quite a big conceptual difference between keeping the
> > reschedule state (it's just optimising of execution of the same task) and
> > keeping state between retries.
> >
> > Right now when you write your operator it's simple - no state to handle.
> > XComs (and everything else) is cleared when task is re-run.
> > With Poke reschedule proposal - the only thing you can do is to
> > save/retrieve a single ID attached to the current task instance. This id
> > will not be cleared on reschedule, but it will be cleared on retry.
> >
> > If we introduce saving state on retries, it opens up a lot of questions -
> > should we keep all retries? or just one? What data should we keep -
> should
> > we allow more structured data? What guidelines should people follow when
> > writing their operators ? And it's a totally different feature that
> should
> > be discussed separately.
> >
> > J.
> >
> >
> > On Fri, Jan 10, 2020 at 5:15 PM Shaw, Damian P. <
> > damian.sha...@credit-suisse.com> wrote:
> >
> > > I just wanted to add a related use case is task retries, there are
> > > lots of scenarios where keeping state between the retries as well as
> > > the reschedules would be extremely helpful, so as long as whatever the
> > > solution is isn't overly narrow I'd be extremely appreciative.
> > >
> > > Damian
> > >
> > > -Original 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-10 Thread Chris Palmer
I agree with Jarek that maintaining state between retries is not the right
thing to do. To be honest I'm not even convinced by the need for state
between reschedules myself.

While I know from past experience that FTP is a pain to deal with, I think
that your example is a pretty niche one. Additionally, when thinking about
idempotent task design, lots of tasks utilize state that exists in other
systems. You should be thinking about what state you want some external
system to be in after the task has run, rather than precisely what actions
you want the task to do.

It's the subtle difference between:

"When it runs, this task should create the required table in my database"
(by running a simple 'CREATE TABLE foobar .')

and

"After this tasks has finished, the required table should exist in my
database" (by running 'CREATE TABLE IF NOT EXISTS foobar .')


The first will fail if run repeatedly (without someone taking some other
action like deleting the table). The second can be run as many times as you
want without error, but it relies on the state that is maintained by your
database.

In your case the external state I think you should care about is the file
system you are downloading the files to, as opposed to some external table
that could get out of sync with the file system. So I would write the
operator so that the first thing it does is compare the complete list with
what already exists in the destination, and then only attempt to download
the ones that are missing.

Chris

On Fri, Jan 10, 2020 at 12:52 PM Jarek Potiuk 
wrote:

> I wonder what others think of it.
>
> On Fri, Jan 10, 2020 at 6:04 PM Shaw, Damian P. <
> damian.sha...@credit-suisse.com> wrote:
>
> > I don't believe so, the default should be that state isn't preserved
> > across retries, just that it's possible for the user to enable it if they
> > are willing to take on that complexity.
>
>
> > We have lots of operators that do this already as if they fail part way
> > through a job the overhead of resuming from the beginning rather than
> > having state on their progress is too much, just annoying we have to keep
> > this state outside Airflow as it requires extra infrastructure for our
> task
> > scheduling.
> >
> > For example we have an FTP site that we need to download 250 files from,
> > the full file list is provided to the operator, the FTP connection is
> very
> > unreliable and the job often fails midway, on retry we don't want to
> resume
> > from the beginning of the job so we store the state of our progress in a
> > table outside Airflow. We can't split the job in to 250 tasks because the
> > FTP site only accepts 1 connection at a time so the overhead of 250
> logins
> > would add an hour to the process and it would make the Airflow UI near
> > unusable.
> >
>
> I do not know all the details of course - but your case seems to be
> solvable much easier and in "Airflow" way. You can have custom operator
> that continues running until everything is downloaded and retries failed
> transfer. The state of which file is downloaded should be kept in memory
> and even if FTP operation fails, it should retry each failed file rather
> than fail the whole operator.  That would keep it idempotent, and keep the
> state in memory rather than in Airflow's DB or in external system. Even if
> you already have an operator that transfers X files already and you do not
> want to change it, you can likely wrap it/extend to keep list of files in
> memory and retry only those files that failed so far. IMHO In your solution
> you do exactly what you are not supposed to according to Airflow's design -
> unless you do some extra logic and complexity your operator is not
> idempotent.
>
> For example - If you delete downloaded files for whatever reason and keep
> the external state and run backfill, I believe what will happen (unless you
> have some extra logic) it will see (from external state) that the files
> were already downloaded and will not download them again. If you use the
> in-memory state, it will work as expected - next time you run it via
> back-fill,  it will re-download all files.
>
> J.
>
>
> > Damian
> >
> > -Original Message-
> > From: Jarek Potiuk 
> > Sent: Friday, January 10, 2020 11:45
> > To: dev@airflow.apache.org
> > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke
> > Rescheduling in Operators
> >
> > Just opening the Pandora's box :).
> >
> > I personally think we should not keep state between retries. It opens up
> > possibilities to whole host of problems and edge cases, and allows people
> > to solve some approaches in non-airflow'y ways - losing some important
> > properties (mainly idempotency). Tasks in Airflow should be idempotent
> and
> > stateless from the operator's author point of view).
> >
> > I think there is quite a big conceptual difference between keeping the
> > reschedule state (it's just optimising of execution of the same task) and
> > keeping state between retries.

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-10 Thread Jarek Potiuk
I wonder what others think of it.

On Fri, Jan 10, 2020 at 6:04 PM Shaw, Damian P. <
damian.sha...@credit-suisse.com> wrote:

> I don't believe so, the default should be that state isn't preserved
> across retries, just that it's possible for the user to enable it if they
> are willing to take on that complexity.


> We have lots of operators that do this already as if they fail part way
> through a job the overhead of resuming from the beginning rather than
> having state on their progress is too much, just annoying we have to keep
> this state outside Airflow as it requires extra infrastructure for our task
> scheduling.
>
> For example we have an FTP site that we need to download 250 files from,
> the full file list is provided to the operator, the FTP connection is very
> unreliable and the job often fails midway, on retry we don't want to resume
> from the beginning of the job so we store the state of our progress in a
> table outside Airflow. We can't split the job in to 250 tasks because the
> FTP site only accepts 1 connection at a time so the overhead of 250 logins
> would add an hour to the process and it would make the Airflow UI near
> unusable.
>

I do not know all the details of course - but your case seems to be
solvable much easier and in "Airflow" way. You can have custom operator
that continues running until everything is downloaded and retries failed
transfer. The state of which file is downloaded should be kept in memory
and even if FTP operation fails, it should retry each failed file rather
than fail the whole operator.  That would keep it idempotent, and keep the
state in memory rather than in Airflow's DB or in external system. Even if
you already have an operator that transfers X files already and you do not
want to change it, you can likely wrap it/extend to keep list of files in
memory and retry only those files that failed so far. IMHO In your solution
you do exactly what you are not supposed to according to Airflow's design -
unless you do some extra logic and complexity your operator is not
idempotent.

For example - If you delete downloaded files for whatever reason and keep
the external state and run backfill, I believe what will happen (unless you
have some extra logic) it will see (from external state) that the files
were already downloaded and will not download them again. If you use the
in-memory state, it will work as expected - next time you run it via
back-fill,  it will re-download all files.

J.


> Damian
>
> -Original Message-
> From: Jarek Potiuk 
> Sent: Friday, January 10, 2020 11:45
> To: dev@airflow.apache.org
> Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke
> Rescheduling in Operators
>
> Just opening the Pandora's box :).
>
> I personally think we should not keep state between retries. It opens up
> possibilities to whole host of problems and edge cases, and allows people
> to solve some approaches in non-airflow'y ways - losing some important
> properties (mainly idempotency). Tasks in Airflow should be idempotent and
> stateless from the operator's author point of view).
>
> I think there is quite a big conceptual difference between keeping the
> reschedule state (it's just optimising of execution of the same task) and
> keeping state between retries.
>
> Right now when you write your operator it's simple - no state to handle.
> XComs (and everything else) is cleared when task is re-run.
> With Poke reschedule proposal - the only thing you can do is to
> save/retrieve a single ID attached to the current task instance. This id
> will not be cleared on reschedule, but it will be cleared on retry.
>
> If we introduce saving state on retries, it opens up a lot of questions -
> should we keep all retries? or just one? What data should we keep - should
> we allow more structured data? What guidelines should people follow when
> writing their operators ? And it's a totally different feature that should
> be discussed separately.
>
> J.
>
>
> On Fri, Jan 10, 2020 at 5:15 PM Shaw, Damian P. <
> damian.sha...@credit-suisse.com> wrote:
>
> > I just wanted to add a related use case is task retries, there are
> > lots of scenarios where keeping state between the retries as well as
> > the reschedules would be extremely helpful, so as long as whatever the
> > solution is isn't overly narrow I'd be extremely appreciative.
> >
> > Damian
> >
> > -Original Message-
> > From: Jarek Potiuk 
> > Sent: Friday, January 10, 2020 11:05
> > To: dev@airflow.apache.org
> > Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke
> > Rescheduling in Operators
> >
> > Also another point to discuss here. As an original author of the idea
> > of using prefix in xcom, I think after the discussions I changed my
> > mind. I think that simply adding a field to an existing table
> > (TaskReschedule?) where we could keep all the data that need to be
> > persisted, seems to be a good idea. We do not impact performance too
> > much (the table is 

RE: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-10 Thread Shaw, Damian P.
I don't believe so, the default should be that state isn't preserved across 
retries, just that it's possible for the user to enable it if they are willing 
to take on that complexity.

We have lots of operators that do this already as if they fail part way through 
a job the overhead of resuming from the beginning rather than having state on 
their progress is too much, just annoying we have to keep this state outside 
Airflow as it requires extra infrastructure for our task scheduling.

For example we have an FTP site that we need to download 250 files from, the 
full file list is provided to the operator, the FTP connection is very 
unreliable and the job often fails midway, on retry we don't want to resume 
from the beginning of the job so we store the state of our progress in a table 
outside Airflow. We can't split the job in to 250 tasks because the FTP site 
only accepts 1 connection at a time so the overhead of 250 logins would add an 
hour to the process and it would make the Airflow UI near unusable. 

Damian

-Original Message-
From: Jarek Potiuk  
Sent: Friday, January 10, 2020 11:45
To: dev@airflow.apache.org
Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling 
in Operators

Just opening the Pandora's box :).

I personally think we should not keep state between retries. It opens up 
possibilities to whole host of problems and edge cases, and allows people to 
solve some approaches in non-airflow'y ways - losing some important properties 
(mainly idempotency). Tasks in Airflow should be idempotent and stateless from 
the operator's author point of view).

I think there is quite a big conceptual difference between keeping the 
reschedule state (it's just optimising of execution of the same task) and 
keeping state between retries.

Right now when you write your operator it's simple - no state to handle.
XComs (and everything else) is cleared when task is re-run.
With Poke reschedule proposal - the only thing you can do is to save/retrieve a 
single ID attached to the current task instance. This id will not be cleared on 
reschedule, but it will be cleared on retry.

If we introduce saving state on retries, it opens up a lot of questions - 
should we keep all retries? or just one? What data should we keep - should we 
allow more structured data? What guidelines should people follow when writing 
their operators ? And it's a totally different feature that should be discussed 
separately.

J.


On Fri, Jan 10, 2020 at 5:15 PM Shaw, Damian P. < 
damian.sha...@credit-suisse.com> wrote:

> I just wanted to add a related use case is task retries, there are 
> lots of scenarios where keeping state between the retries as well as 
> the reschedules would be extremely helpful, so as long as whatever the 
> solution is isn't overly narrow I'd be extremely appreciative.
>
> Damian
>
> -Original Message-
> From: Jarek Potiuk 
> Sent: Friday, January 10, 2020 11:05
> To: dev@airflow.apache.org
> Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke 
> Rescheduling in Operators
>
> Also another point to discuss here. As an original author of the idea 
> of using prefix in xcom, I think after the discussions I changed my 
> mind. I think that simply adding a field to an existing table 
> (TaskReschedule?) where we could keep all the data that need to be 
> persisted, seems to be a good idea. We do not impact performance too 
> much (the table is already
> queried) , we do not add too much complexity and we do not try to 
> introduce a generic "state" storage - this would be a solution 
> dedicated to only handle rescheduling.
>
> On Fri, Jan 10, 2020 at 1:44 PM Driesprong, Fokko 
> 
> wrote:
>
> > The repoke logic as it is now implemented with the sensor, is able 
> > to recover from an unexpected crash. After each poke, it will just 
> > go to sleep. If the process crashes in between, it might become a 
> > zombie task in the end, but this is also taken care of by the 
> > scheduler. In this case, the scheduler thinks the task is still 
> > running, but in
> reality, it crashes.
> > There is a timeout that will reset the execution. Hopefully, this 
> > doesn't happen often, and should only occur when something is off 
> > (for example a machine crashed, or a network partition, etc). HTH
> >
> > Personally I don't like duplicating the same table for such a 
> > similar use case. But that's a design choice I guess.
> >
> > If we go for the async executor, the above might be different. I 
> > think it is good to not include this in the discussion.
> >
> > Cheers, Fokko
> >
> >
> > Op do 9 jan. 2020 om 19:33 schreef Darren Weber < 
> > dweber.consult...@gmail.com
> > >:
> >
> > > Not sure whether to add to this email thread or the google-doc 
> > > (not sure
> > if
> > > that google-doc is just a meeting-notes or if it should evolve 
> > > into a
> > spec
> > > :grin:).
> > >
> > > Maybe a stupid suggestion, but here it is anyway:
> > >
> > > XCom - 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-10 Thread Jarek Potiuk
Just opening the Pandora's box :).

I personally think we should not keep state between retries. It opens up
possibilities to whole host of problems and edge cases, and allows people
to solve some approaches in non-airflow'y ways - losing some important
properties (mainly idempotency). Tasks in Airflow should be idempotent and
stateless from the operator's author point of view).

I think there is quite a big conceptual difference between keeping the
reschedule state (it's just optimising of execution of the same task) and
keeping state between retries.

Right now when you write your operator it's simple - no state to handle.
XComs (and everything else) is cleared when task is re-run.
With Poke reschedule proposal - the only thing you can do is to
save/retrieve a single ID attached to the current task instance. This id
will not be cleared on reschedule, but it will be cleared on retry.

If we introduce saving state on retries, it opens up a lot of questions -
should we keep all retries? or just one? What data should we keep - should
we allow more structured data? What guidelines should people follow when
writing their operators ? And it's a totally different feature that should
be discussed separately.

J.


On Fri, Jan 10, 2020 at 5:15 PM Shaw, Damian P. <
damian.sha...@credit-suisse.com> wrote:

> I just wanted to add a related use case is task retries, there are lots of
> scenarios where keeping state between the retries as well as the
> reschedules would be extremely helpful, so as long as whatever the solution
> is isn't overly narrow I'd be extremely appreciative.
>
> Damian
>
> -Original Message-
> From: Jarek Potiuk 
> Sent: Friday, January 10, 2020 11:05
> To: dev@airflow.apache.org
> Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke
> Rescheduling in Operators
>
> Also another point to discuss here. As an original author of the idea of
> using prefix in xcom, I think after the discussions I changed my mind. I
> think that simply adding a field to an existing table (TaskReschedule?)
> where we could keep all the data that need to be persisted, seems to be a
> good idea. We do not impact performance too much (the table is already
> queried) , we do not add too much complexity and we do not try to introduce
> a generic "state" storage - this would be a solution dedicated to only
> handle rescheduling.
>
> On Fri, Jan 10, 2020 at 1:44 PM Driesprong, Fokko 
> wrote:
>
> > The repoke logic as it is now implemented with the sensor, is able to
> > recover from an unexpected crash. After each poke, it will just go to
> > sleep. If the process crashes in between, it might become a zombie
> > task in the end, but this is also taken care of by the scheduler. In
> > this case, the scheduler thinks the task is still running, but in
> reality, it crashes.
> > There is a timeout that will reset the execution. Hopefully, this
> > doesn't happen often, and should only occur when something is off (for
> > example a machine crashed, or a network partition, etc). HTH
> >
> > Personally I don't like duplicating the same table for such a similar
> > use case. But that's a design choice I guess.
> >
> > If we go for the async executor, the above might be different. I think
> > it is good to not include this in the discussion.
> >
> > Cheers, Fokko
> >
> >
> > Op do 9 jan. 2020 om 19:33 schreef Darren Weber <
> > dweber.consult...@gmail.com
> > >:
> >
> > > Not sure whether to add to this email thread or the google-doc (not
> > > sure
> > if
> > > that google-doc is just a meeting-notes or if it should evolve into
> > > a
> > spec
> > > :grin:).
> > >
> > > Maybe a stupid suggestion, but here it is anyway:
> > >
> > > XCom - communication between elements of a DAG
> > >
> > > XState - key/value store available for each element of a DAG
> > >
> > > Clearly separate the behavior of a stateful resource (XState) from
> > > one
> > that
> > > is not intended to be stateful (XCom), if that makes any sense?
> > (Creating
> > > a new XState feature is similar to a new db-table, I guess.)
> > >
> > > Just to explain what I understand about the goals of how Airflow
> > > should behave when it has some ability for an operator to reschedule
> > > pokes and
> > the
> > > scope of the changes.  In the big picture, it's important that
> > > Airflow
> > can
> > > resurrect a DAG on a restart when some elements of the DAG contain
> > > operators/sensors that are dependent on external cloud operations (e.g.
> > AWS
> > > Batch).  This is feasible when Airflow can persist any unique job-ID
> > > defined by the external job provider (e.g. AWS Batch "jobId") and
> > > any related identifiers for the job (e.g. AWS Batch infrastructure
> > > ARNs for batch queue/compute-env etc and all of this detail is
> > > captured in the AwsBatchOperator already).  Assuming Airflow runs a
> > > DAG that spins up
> > 100's
> > > or 1000's of such external jobs and persists the external "jobId",
> > > when Airflow crashes or is 

RE: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-10 Thread Shaw, Damian P.
I just wanted to add a related use case is task retries, there are lots of 
scenarios where keeping state between the retries as well as the reschedules 
would be extremely helpful, so as long as whatever the solution is isn't overly 
narrow I'd be extremely appreciative.

Damian

-Original Message-
From: Jarek Potiuk  
Sent: Friday, January 10, 2020 11:05
To: dev@airflow.apache.org
Subject: Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling 
in Operators

Also another point to discuss here. As an original author of the idea of using 
prefix in xcom, I think after the discussions I changed my mind. I think that 
simply adding a field to an existing table (TaskReschedule?) where we could 
keep all the data that need to be persisted, seems to be a good idea. We do not 
impact performance too much (the table is already queried) , we do not add too 
much complexity and we do not try to introduce a generic "state" storage - this 
would be a solution dedicated to only handle rescheduling.

On Fri, Jan 10, 2020 at 1:44 PM Driesprong, Fokko 
wrote:

> The repoke logic as it is now implemented with the sensor, is able to 
> recover from an unexpected crash. After each poke, it will just go to 
> sleep. If the process crashes in between, it might become a zombie 
> task in the end, but this is also taken care of by the scheduler. In 
> this case, the scheduler thinks the task is still running, but in reality, it 
> crashes.
> There is a timeout that will reset the execution. Hopefully, this 
> doesn't happen often, and should only occur when something is off (for 
> example a machine crashed, or a network partition, etc). HTH
>
> Personally I don't like duplicating the same table for such a similar 
> use case. But that's a design choice I guess.
>
> If we go for the async executor, the above might be different. I think 
> it is good to not include this in the discussion.
>
> Cheers, Fokko
>
>
> Op do 9 jan. 2020 om 19:33 schreef Darren Weber < 
> dweber.consult...@gmail.com
> >:
>
> > Not sure whether to add to this email thread or the google-doc (not 
> > sure
> if
> > that google-doc is just a meeting-notes or if it should evolve into 
> > a
> spec
> > :grin:).
> >
> > Maybe a stupid suggestion, but here it is anyway:
> >
> > XCom - communication between elements of a DAG
> >
> > XState - key/value store available for each element of a DAG
> >
> > Clearly separate the behavior of a stateful resource (XState) from 
> > one
> that
> > is not intended to be stateful (XCom), if that makes any sense?
> (Creating
> > a new XState feature is similar to a new db-table, I guess.)
> >
> > Just to explain what I understand about the goals of how Airflow 
> > should behave when it has some ability for an operator to reschedule 
> > pokes and
> the
> > scope of the changes.  In the big picture, it's important that 
> > Airflow
> can
> > resurrect a DAG on a restart when some elements of the DAG contain 
> > operators/sensors that are dependent on external cloud operations (e.g.
> AWS
> > Batch).  This is feasible when Airflow can persist any unique job-ID 
> > defined by the external job provider (e.g. AWS Batch "jobId") and 
> > any related identifiers for the job (e.g. AWS Batch infrastructure 
> > ARNs for batch queue/compute-env etc and all of this detail is 
> > captured in the AwsBatchOperator already).  Assuming Airflow runs a 
> > DAG that spins up
> 100's
> > or 1000's of such external jobs and persists the external "jobId", 
> > when Airflow crashes or is stopped for upgrades etc. and restarted, 
> > the operators that submitted the jobs should be able to try to check 
> > on the state of those previously submitted jobs.  If the jobs are 
> > still running
> on
> > the external provider (e.g. AWS Batch), it should be able to resume 
> > monitoring (poking) the job status without re-submitting a duplicate 
> > job (also any failure to poke a job should have some level of 
> > poke-retry behavior that does not immediately fail the Airflow task 
> > that results in somehow re-submitting the same job that is already 
> > running).  So, in that context, what is the scope of the 
> > "reshedule-poke" changes - do they
> simply
> > release a worker and so long as Airflow is "up" (has not crashed), 
> > the reschedule can resume poking, but if Airflow crashes, the whole 
> > thing starts over again because the state of the task is not 
> > resilient to
> Airflow
> > crashing?  Or, does the work on the "reschedule-poke" also provide 
> > resilience when Airflow crashes?  If the goal is to be resilient to
> Airflow
> > crashes, what is required for the "reschedule-poke" work to 
> > accomplish
> that
> > goal, if it doesn't already?  (Would the architecture for Airflow 
> > resilience be out-of-scope in this context because it involves more 
> > complexity, like a Kafka cluster?)
> >
> > -- Darren
> >
> >
> >
> > On Wed, Jan 8, 2020 at 2:24 AM Jarek Potiuk 
> > 
> > wrote:
> >
> > > Commented as 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-10 Thread Jarek Potiuk
Also another point to discuss here. As an original author of the idea of
using prefix in xcom, I think after the discussions I changed my mind. I
think that simply adding a field to an existing table (TaskReschedule?)
where we could keep all the data that need to be persisted, seems to be a
good idea. We do not impact performance too much (the table is
already queried) , we do not add too much complexity and we do not try to
introduce a generic "state" storage - this would be a solution dedicated to
only handle rescheduling.

On Fri, Jan 10, 2020 at 1:44 PM Driesprong, Fokko 
wrote:

> The repoke logic as it is now implemented with the sensor, is able to
> recover from an unexpected crash. After each poke, it will just go to
> sleep. If the process crashes in between, it might become a zombie task in
> the end, but this is also taken care of by the scheduler. In this case, the
> scheduler thinks the task is still running, but in reality, it crashes.
> There is a timeout that will reset the execution. Hopefully, this doesn't
> happen often, and should only occur when something is off (for example a
> machine crashed, or a network partition, etc). HTH
>
> Personally I don't like duplicating the same table for such a similar use
> case. But that's a design choice I guess.
>
> If we go for the async executor, the above might be different. I think it
> is good to not include this in the discussion.
>
> Cheers, Fokko
>
>
> Op do 9 jan. 2020 om 19:33 schreef Darren Weber <
> dweber.consult...@gmail.com
> >:
>
> > Not sure whether to add to this email thread or the google-doc (not sure
> if
> > that google-doc is just a meeting-notes or if it should evolve into a
> spec
> > :grin:).
> >
> > Maybe a stupid suggestion, but here it is anyway:
> >
> > XCom - communication between elements of a DAG
> >
> > XState - key/value store available for each element of a DAG
> >
> > Clearly separate the behavior of a stateful resource (XState) from one
> that
> > is not intended to be stateful (XCom), if that makes any sense?
> (Creating
> > a new XState feature is similar to a new db-table, I guess.)
> >
> > Just to explain what I understand about the goals of how Airflow should
> > behave when it has some ability for an operator to reschedule pokes and
> the
> > scope of the changes.  In the big picture, it's important that Airflow
> can
> > resurrect a DAG on a restart when some elements of the DAG contain
> > operators/sensors that are dependent on external cloud operations (e.g.
> AWS
> > Batch).  This is feasible when Airflow can persist any unique job-ID
> > defined by the external job provider (e.g. AWS Batch "jobId") and any
> > related identifiers for the job (e.g. AWS Batch infrastructure ARNs for
> > batch queue/compute-env etc and all of this detail is captured in the
> > AwsBatchOperator already).  Assuming Airflow runs a DAG that spins up
> 100's
> > or 1000's of such external jobs and persists the external "jobId", when
> > Airflow crashes or is stopped for upgrades etc. and restarted, the
> > operators that submitted the jobs should be able to try to check on the
> > state of those previously submitted jobs.  If the jobs are still running
> on
> > the external provider (e.g. AWS Batch), it should be able to resume
> > monitoring (poking) the job status without re-submitting a duplicate job
> > (also any failure to poke a job should have some level of poke-retry
> > behavior that does not immediately fail the Airflow task that results in
> > somehow re-submitting the same job that is already running).  So, in that
> > context, what is the scope of the "reshedule-poke" changes - do they
> simply
> > release a worker and so long as Airflow is "up" (has not crashed), the
> > reschedule can resume poking, but if Airflow crashes, the whole thing
> > starts over again because the state of the task is not resilient to
> Airflow
> > crashing?  Or, does the work on the "reschedule-poke" also provide
> > resilience when Airflow crashes?  If the goal is to be resilient to
> Airflow
> > crashes, what is required for the "reschedule-poke" work to accomplish
> that
> > goal, if it doesn't already?  (Would the architecture for Airflow
> > resilience be out-of-scope in this context because it involves more
> > complexity, like a Kafka cluster?)
> >
> > -- Darren
> >
> >
> >
> > On Wed, Jan 8, 2020 at 2:24 AM Jarek Potiuk 
> > wrote:
> >
> > > Commented as well. I think we are really going in a good direction!
> > >
> > > On Wed, Jan 8, 2020 at 9:22 AM Driesprong, Fokko  >
> > > wrote:
> > >
> > > > Thanks Jacob for building the document. I think we're on the right
> > track.
> > > > I've added some comments and clarification to the document, to
> validate
> > > > we're looking in the same direction. Would love to get more people's
> > > > opinion on this.
> > > >
> > > > Cheers, Fokko
> > > >
> > > > Op wo 8 jan. 2020 om 03:31 schreef Jacob Ferriero
> > > > :
> > > >
> > > > > Image not working on dev list here is link to the 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-10 Thread Driesprong, Fokko
The repoke logic as it is now implemented with the sensor, is able to
recover from an unexpected crash. After each poke, it will just go to
sleep. If the process crashes in between, it might become a zombie task in
the end, but this is also taken care of by the scheduler. In this case, the
scheduler thinks the task is still running, but in reality, it crashes.
There is a timeout that will reset the execution. Hopefully, this doesn't
happen often, and should only occur when something is off (for example a
machine crashed, or a network partition, etc). HTH

Personally I don't like duplicating the same table for such a similar use
case. But that's a design choice I guess.

If we go for the async executor, the above might be different. I think it
is good to not include this in the discussion.

Cheers, Fokko


Op do 9 jan. 2020 om 19:33 schreef Darren Weber :

> Not sure whether to add to this email thread or the google-doc (not sure if
> that google-doc is just a meeting-notes or if it should evolve into a spec
> :grin:).
>
> Maybe a stupid suggestion, but here it is anyway:
>
> XCom - communication between elements of a DAG
>
> XState - key/value store available for each element of a DAG
>
> Clearly separate the behavior of a stateful resource (XState) from one that
> is not intended to be stateful (XCom), if that makes any sense?  (Creating
> a new XState feature is similar to a new db-table, I guess.)
>
> Just to explain what I understand about the goals of how Airflow should
> behave when it has some ability for an operator to reschedule pokes and the
> scope of the changes.  In the big picture, it's important that Airflow can
> resurrect a DAG on a restart when some elements of the DAG contain
> operators/sensors that are dependent on external cloud operations (e.g. AWS
> Batch).  This is feasible when Airflow can persist any unique job-ID
> defined by the external job provider (e.g. AWS Batch "jobId") and any
> related identifiers for the job (e.g. AWS Batch infrastructure ARNs for
> batch queue/compute-env etc and all of this detail is captured in the
> AwsBatchOperator already).  Assuming Airflow runs a DAG that spins up 100's
> or 1000's of such external jobs and persists the external "jobId", when
> Airflow crashes or is stopped for upgrades etc. and restarted, the
> operators that submitted the jobs should be able to try to check on the
> state of those previously submitted jobs.  If the jobs are still running on
> the external provider (e.g. AWS Batch), it should be able to resume
> monitoring (poking) the job status without re-submitting a duplicate job
> (also any failure to poke a job should have some level of poke-retry
> behavior that does not immediately fail the Airflow task that results in
> somehow re-submitting the same job that is already running).  So, in that
> context, what is the scope of the "reshedule-poke" changes - do they simply
> release a worker and so long as Airflow is "up" (has not crashed), the
> reschedule can resume poking, but if Airflow crashes, the whole thing
> starts over again because the state of the task is not resilient to Airflow
> crashing?  Or, does the work on the "reschedule-poke" also provide
> resilience when Airflow crashes?  If the goal is to be resilient to Airflow
> crashes, what is required for the "reschedule-poke" work to accomplish that
> goal, if it doesn't already?  (Would the architecture for Airflow
> resilience be out-of-scope in this context because it involves more
> complexity, like a Kafka cluster?)
>
> -- Darren
>
>
>
> On Wed, Jan 8, 2020 at 2:24 AM Jarek Potiuk 
> wrote:
>
> > Commented as well. I think we are really going in a good direction!
> >
> > On Wed, Jan 8, 2020 at 9:22 AM Driesprong, Fokko 
> > wrote:
> >
> > > Thanks Jacob for building the document. I think we're on the right
> track.
> > > I've added some comments and clarification to the document, to validate
> > > we're looking in the same direction. Would love to get more people's
> > > opinion on this.
> > >
> > > Cheers, Fokko
> > >
> > > Op wo 8 jan. 2020 om 03:31 schreef Jacob Ferriero
> > > :
> > >
> > > > Image not working on dev list here is link to the github review
> comment
> > > > containing said image:
> > > > https://github.com/apache/airflow/pull/6370#issuecomment-546582724.
> > > >
> > > > On Tue, Jan 7, 2020 at 5:40 PM Jacob Ferriero 
> > > > wrote:
> > > >
> > > >> Hello Dev List,
> > > >>
> > > >> The inspiration for this is to allow operators to start a long
> running
> > > >> task on an external system and reschedule pokes for completion (e.g
> > > spark
> > > >> job on dataproc), instead of blocking a worker (sketched out in
> #6210
> > > >> ) to allow freeing up
> of
> > > >> slots between pokes. To do this requires supporting a method for
> > storing
> > > >> task state between reschedules.
> > > >> It's worth noting that a task would maintain state only during
> > > >> reschedules but clear state on 

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-09 Thread Darren Weber
Not sure whether to add to this email thread or the google-doc (not sure if
that google-doc is just a meeting-notes or if it should evolve into a spec
:grin:).

Maybe a stupid suggestion, but here it is anyway:

XCom - communication between elements of a DAG

XState - key/value store available for each element of a DAG

Clearly separate the behavior of a stateful resource (XState) from one that
is not intended to be stateful (XCom), if that makes any sense?  (Creating
a new XState feature is similar to a new db-table, I guess.)

Just to explain what I understand about the goals of how Airflow should
behave when it has some ability for an operator to reschedule pokes and the
scope of the changes.  In the big picture, it's important that Airflow can
resurrect a DAG on a restart when some elements of the DAG contain
operators/sensors that are dependent on external cloud operations (e.g. AWS
Batch).  This is feasible when Airflow can persist any unique job-ID
defined by the external job provider (e.g. AWS Batch "jobId") and any
related identifiers for the job (e.g. AWS Batch infrastructure ARNs for
batch queue/compute-env etc and all of this detail is captured in the
AwsBatchOperator already).  Assuming Airflow runs a DAG that spins up 100's
or 1000's of such external jobs and persists the external "jobId", when
Airflow crashes or is stopped for upgrades etc. and restarted, the
operators that submitted the jobs should be able to try to check on the
state of those previously submitted jobs.  If the jobs are still running on
the external provider (e.g. AWS Batch), it should be able to resume
monitoring (poking) the job status without re-submitting a duplicate job
(also any failure to poke a job should have some level of poke-retry
behavior that does not immediately fail the Airflow task that results in
somehow re-submitting the same job that is already running).  So, in that
context, what is the scope of the "reshedule-poke" changes - do they simply
release a worker and so long as Airflow is "up" (has not crashed), the
reschedule can resume poking, but if Airflow crashes, the whole thing
starts over again because the state of the task is not resilient to Airflow
crashing?  Or, does the work on the "reschedule-poke" also provide
resilience when Airflow crashes?  If the goal is to be resilient to Airflow
crashes, what is required for the "reschedule-poke" work to accomplish that
goal, if it doesn't already?  (Would the architecture for Airflow
resilience be out-of-scope in this context because it involves more
complexity, like a Kafka cluster?)

-- Darren



On Wed, Jan 8, 2020 at 2:24 AM Jarek Potiuk 
wrote:

> Commented as well. I think we are really going in a good direction!
>
> On Wed, Jan 8, 2020 at 9:22 AM Driesprong, Fokko 
> wrote:
>
> > Thanks Jacob for building the document. I think we're on the right track.
> > I've added some comments and clarification to the document, to validate
> > we're looking in the same direction. Would love to get more people's
> > opinion on this.
> >
> > Cheers, Fokko
> >
> > Op wo 8 jan. 2020 om 03:31 schreef Jacob Ferriero
> > :
> >
> > > Image not working on dev list here is link to the github review comment
> > > containing said image:
> > > https://github.com/apache/airflow/pull/6370#issuecomment-546582724.
> > >
> > > On Tue, Jan 7, 2020 at 5:40 PM Jacob Ferriero 
> > > wrote:
> > >
> > >> Hello Dev List,
> > >>
> > >> The inspiration for this is to allow operators to start a long running
> > >> task on an external system and reschedule pokes for completion (e.g
> > spark
> > >> job on dataproc), instead of blocking a worker (sketched out in #6210
> > >> ) to allow freeing up of
> > >> slots between pokes. To do this requires supporting a method for
> storing
> > >> task state between reschedules.
> > >> It's worth noting that a task would maintain state only during
> > >> reschedules but clear state on retries. In this way the task is
> > idempotent
> > >> before reaching a terminal state [SUCCES, FAIL, UP_FOR_RETRY]. This
> > brings
> > >> up a question of the scope of commitment to idempotency of operators.
> > If it
> > >> is deemed acceptable for reschedules to maintain some state, then we
> can
> > >> free up workers between pokes.
> > >>
> > >> Because this is very similar to the purpose of XCom it's been
> postulated
> > >> that we should support this behavior in XCom rather than provide a new
> > >> model in the db for TaskState. (Though discussion here on which is
> more
> > >> appropriate is more than welcome.)
> > >>
> > >> I'd like to put forward a proposal to resurrect the reverted #6370
> > >>  in order to provide a
> > >> modification to the lifetime of XComs under certain conditions. The
> > diagram
> > >> below helps illustrate the change originally proposed in #6370. There
> > was
> > >> concern about changing existing behavior (potentially breaking) and
> the

Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-08 Thread Jarek Potiuk
Commented as well. I think we are really going in a good direction!

On Wed, Jan 8, 2020 at 9:22 AM Driesprong, Fokko 
wrote:

> Thanks Jacob for building the document. I think we're on the right track.
> I've added some comments and clarification to the document, to validate
> we're looking in the same direction. Would love to get more people's
> opinion on this.
>
> Cheers, Fokko
>
> Op wo 8 jan. 2020 om 03:31 schreef Jacob Ferriero
> :
>
> > Image not working on dev list here is link to the github review comment
> > containing said image:
> > https://github.com/apache/airflow/pull/6370#issuecomment-546582724.
> >
> > On Tue, Jan 7, 2020 at 5:40 PM Jacob Ferriero 
> > wrote:
> >
> >> Hello Dev List,
> >>
> >> The inspiration for this is to allow operators to start a long running
> >> task on an external system and reschedule pokes for completion (e.g
> spark
> >> job on dataproc), instead of blocking a worker (sketched out in #6210
> >> ) to allow freeing up of
> >> slots between pokes. To do this requires supporting a method for storing
> >> task state between reschedules.
> >> It's worth noting that a task would maintain state only during
> >> reschedules but clear state on retries. In this way the task is
> idempotent
> >> before reaching a terminal state [SUCCES, FAIL, UP_FOR_RETRY]. This
> brings
> >> up a question of the scope of commitment to idempotency of operators.
> If it
> >> is deemed acceptable for reschedules to maintain some state, then we can
> >> free up workers between pokes.
> >>
> >> Because this is very similar to the purpose of XCom it's been postulated
> >> that we should support this behavior in XCom rather than provide a new
> >> model in the db for TaskState. (Though discussion here on which is more
> >> appropriate is more than welcome.)
> >>
> >> I'd like to put forward a proposal to resurrect the reverted #6370
> >>  in order to provide a
> >> modification to the lifetime of XComs under certain conditions. The
> diagram
> >> below helps illustrate the change originally proposed in #6370. There
> was
> >> concern about changing existing behavior (potentially breaking) and the
> >> fact that this makes operators stateful. Per the review comments and an
> >> informal discussion (meetings notes
> >> <
> https://docs.google.com/document/d/1uuNCPAcwnn0smcDUJPDFMMjrK-z6Z0osesPG7jVZ3oU/edit#
> >
> >> and #sig-async-operators) I'd like to modify the approach #6370 to only
> >> skip clearing of XCom if the Xom key is prefixed with
> >> `airflow.models.xcom.DO_NOT_CLEAR_PREFIX = "_STATEFUL_"` or similar.
> >>
> >> [image: image.png]
> >> --
> >>
> >> *Jacob Ferriero*
> >>
> >> Strategic Cloud Engineer: Data Engineering
> >>
> >> jferri...@google.com
> >>
> >> 617-714-2509 <(617)%20714-2509>
> >>
> >
> >
> > --
> >
> > *Jacob Ferriero*
> >
> > Strategic Cloud Engineer: Data Engineering
> >
> > jferri...@google.com
> >
> > 617-714-2509
> >
>


-- 

Jarek Potiuk
Polidea  | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] 


Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-08 Thread Driesprong, Fokko
Thanks Jacob for building the document. I think we're on the right track.
I've added some comments and clarification to the document, to validate
we're looking in the same direction. Would love to get more people's
opinion on this.

Cheers, Fokko

Op wo 8 jan. 2020 om 03:31 schreef Jacob Ferriero
:

> Image not working on dev list here is link to the github review comment
> containing said image:
> https://github.com/apache/airflow/pull/6370#issuecomment-546582724.
>
> On Tue, Jan 7, 2020 at 5:40 PM Jacob Ferriero 
> wrote:
>
>> Hello Dev List,
>>
>> The inspiration for this is to allow operators to start a long running
>> task on an external system and reschedule pokes for completion (e.g spark
>> job on dataproc), instead of blocking a worker (sketched out in #6210
>> ) to allow freeing up of
>> slots between pokes. To do this requires supporting a method for storing
>> task state between reschedules.
>> It's worth noting that a task would maintain state only during
>> reschedules but clear state on retries. In this way the task is idempotent
>> before reaching a terminal state [SUCCES, FAIL, UP_FOR_RETRY]. This brings
>> up a question of the scope of commitment to idempotency of operators. If it
>> is deemed acceptable for reschedules to maintain some state, then we can
>> free up workers between pokes.
>>
>> Because this is very similar to the purpose of XCom it's been postulated
>> that we should support this behavior in XCom rather than provide a new
>> model in the db for TaskState. (Though discussion here on which is more
>> appropriate is more than welcome.)
>>
>> I'd like to put forward a proposal to resurrect the reverted #6370
>>  in order to provide a
>> modification to the lifetime of XComs under certain conditions. The diagram
>> below helps illustrate the change originally proposed in #6370. There was
>> concern about changing existing behavior (potentially breaking) and the
>> fact that this makes operators stateful. Per the review comments and an
>> informal discussion (meetings notes
>> 
>> and #sig-async-operators) I'd like to modify the approach #6370 to only
>> skip clearing of XCom if the Xom key is prefixed with
>> `airflow.models.xcom.DO_NOT_CLEAR_PREFIX = "_STATEFUL_"` or similar.
>>
>> [image: image.png]
>> --
>>
>> *Jacob Ferriero*
>>
>> Strategic Cloud Engineer: Data Engineering
>>
>> jferri...@google.com
>>
>> 617-714-2509 <(617)%20714-2509>
>>
>
>
> --
>
> *Jacob Ferriero*
>
> Strategic Cloud Engineer: Data Engineering
>
> jferri...@google.com
>
> 617-714-2509
>


Re: [Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-07 Thread Jacob Ferriero
Image not working on dev list here is link to the github review comment
containing said image:
https://github.com/apache/airflow/pull/6370#issuecomment-546582724.

On Tue, Jan 7, 2020 at 5:40 PM Jacob Ferriero  wrote:

> Hello Dev List,
>
> The inspiration for this is to allow operators to start a long running
> task on an external system and reschedule pokes for completion (e.g spark
> job on dataproc), instead of blocking a worker (sketched out in #6210
> ) to allow freeing up of
> slots between pokes. To do this requires supporting a method for storing
> task state between reschedules.
> It's worth noting that a task would maintain state only during reschedules
> but clear state on retries. In this way the task is idempotent before
> reaching a terminal state [SUCCES, FAIL, UP_FOR_RETRY]. This brings up a
> question of the scope of commitment to idempotency of operators. If it is
> deemed acceptable for reschedules to maintain some state, then we can free
> up workers between pokes.
>
> Because this is very similar to the purpose of XCom it's been postulated
> that we should support this behavior in XCom rather than provide a new
> model in the db for TaskState. (Though discussion here on which is more
> appropriate is more than welcome.)
>
> I'd like to put forward a proposal to resurrect the reverted #6370
>  in order to provide a
> modification to the lifetime of XComs under certain conditions. The diagram
> below helps illustrate the change originally proposed in #6370. There was
> concern about changing existing behavior (potentially breaking) and the
> fact that this makes operators stateful. Per the review comments and an
> informal discussion (meetings notes
> 
> and #sig-async-operators) I'd like to modify the approach #6370 to only
> skip clearing of XCom if the Xom key is prefixed with
> `airflow.models.xcom.DO_NOT_CLEAR_PREFIX = "_STATEFUL_"` or similar.
>
> [image: image.png]
> --
>
> *Jacob Ferriero*
>
> Strategic Cloud Engineer: Data Engineering
>
> jferri...@google.com
>
> 617-714-2509 <(617)%20714-2509>
>


-- 

*Jacob Ferriero*

Strategic Cloud Engineer: Data Engineering

jferri...@google.com

617-714-2509


[Discussion] In Prep for AIP: Stateful XComs and Poke Rescheduling in Operators

2020-01-07 Thread Jacob Ferriero
Hello Dev List,

The inspiration for this is to allow operators to start a long running task
on an external system and reschedule pokes for completion (e.g spark job on
dataproc), instead of blocking a worker (sketched out in #6210
) to allow freeing up of slots
between pokes. To do this requires supporting a method for storing task
state between reschedules.
It's worth noting that a task would maintain state only during reschedules
but clear state on retries. In this way the task is idempotent before
reaching a terminal state [SUCCES, FAIL, UP_FOR_RETRY]. This brings up a
question of the scope of commitment to idempotency of operators. If it is
deemed acceptable for reschedules to maintain some state, then we can free
up workers between pokes.

Because this is very similar to the purpose of XCom it's been postulated
that we should support this behavior in XCom rather than provide a new
model in the db for TaskState. (Though discussion here on which is more
appropriate is more than welcome.)

I'd like to put forward a proposal to resurrect the reverted #6370
 in order to provide a
modification to the lifetime of XComs under certain conditions. The diagram
below helps illustrate the change originally proposed in #6370. There was
concern about changing existing behavior (potentially breaking) and the
fact that this makes operators stateful. Per the review comments and an
informal discussion (meetings notes

and #sig-async-operators) I'd like to modify the approach #6370 to only
skip clearing of XCom if the Xom key is prefixed with
`airflow.models.xcom.DO_NOT_CLEAR_PREFIX = "_STATEFUL_"` or similar.

[image: image.png]
-- 

*Jacob Ferriero*

Strategic Cloud Engineer: Data Engineering

jferri...@google.com

617-714-2509 <(617)%20714-2509>