It might then be
DAG(
...
timetable=CalendarTimeTable(
// Wraps a time table
TimeTable(
execution_plan=…,
),
// Whatever options define the calendar
...
)
)
The idea being that a calendar wraps an existing time table, skipping
certain days (e.g. holidays). There could be lots of ways to define those
days, but a rule-based approach is of course rather nice.
As an aside, I do think "Schedule" is a better word because it's short,
rather obvious and doesn't have the Timetable vs TimeTable problem.
On Sat, 15 May 2021 at 04:06, TP Chung <[email protected]> wrote:
> I feel you are sort of operating on a different level of abstraction from
> AIP-39. While it is true that Airflow does generally take a declarative
> approach for scheduling currently (which is a good thing and should be
> continued), AIP-39 is more about providing a foundation so richer things
> can be declared. Its design does not preclude declarative things to be
> implemented, much like how most of Python is procedural in the first place,
> but that did not prevent Airflow from having a declarative interface.
>
> Timetable does not take away much of the declarative possibility, since we
> can easily have something like
>
> DAG(
> ...
> timetable=CalendarTimeTable(
> calendar=…,
> execution_plan=…,
> ),
> )
>
> that implements what you want. But the nice thing about this extra
> abstraction is it keeps doors open for things that might not work well for
> calendars. You may argue those are uncommon cases, but what prompted AIP-39
> in the first place were uncommon cases not considered (or intentionally
> ignored for simplicity) by the original Airflow implementation in the first
> place. AIP-39 does well providing a good foundation for most flexibility
> without sacrificing much of the declarative goodness (if at all; it’s
> arguable the TimeTable class is actually an improvement for explicitness).
>
> TP
>
>
> On 14 May 2021, at 04:59, Malthe <[email protected]> wrote:
>
> When it comes to scheduling, Airflow does take a rather declarative
> approach I would say, but it is certainly correct that it very much stops
> there.
>
> I appreciate the arguments favoring a more object-oriented design, but I
> do think that adding a couple of additional scheduling options could go a
> very long way in terms of providing that extra bit of scheduling
> flexibility – while preserving the "scripting ergonomics".
>
> The current proposal leaves most of the interesting use-cases on the table
> rather than aiming to show that the abstraction actually meets the
> requirements.
>
> Cheers
>
> On Thu, 13 May 2021 at 15:01, Kaxil Naik <[email protected]> wrote:
>
>> And also the proposed items with Timetables are more "extensible" too --
>> Users can develop some classes for their own use and create a library for
>> reusing it.
>>
>> Using arguments like you are proposing @malthe -- it can be difficult to
>> understand on all the "related" arguments to understand the scheduling /
>> schedule_interval.
>>
>> On Thu, May 13, 2021 at 3:46 PM Jarek Potiuk <[email protected]> wrote:
>>
>>> I much more on Ash's proposal with this one. I think we do not want to
>>> optimize for a number of changes but instead we want to make sure what we
>>> come up with is an easy and natural to use long-term solution. Even if it
>>> departs a lot from what we have now, a few months (or maybe years) after
>>> it becomes mainstream nobody will remember the "old way" hopefully.
>>>
>>> The idea of "pythonic" pluggable schedule rather than "declarative" way
>>> goes perfectly in-line with the whole concept of Airflow where DAGs are
>>> defined as Python code rather than declaratively. So making schedules
>>> follow the same approach seems very natural for anyone who uses Airflow.
>>>
>>>
>>> J.
>>>
>>>
>>> On Thu, May 13, 2021 at 9:27 AM Malthe <[email protected]> wrote:
>>>
>>>> I'm a bit late to the discussion, but it might be interesting to
>>>> explore a more simple approach, cutting back on the number of changes.
>>>>
>>>> As a general remark, "execution_date" may be a slightly difficult
>>>> concept to grasp, but changing it now is perhaps counterproductive.
>>>>
>>>> From the AIP examples:
>>>>
>>>> 1. The MON-FRI problem could be handled using an optional keyword
>>>> argument "execution_interval" which defaults to `None` (meaning automatic –
>>>> this is the current behavior). But instead a `timedelta` could be provided,
>>>> i.e. `timedelta(days=1)`.
>>>>
>>>> 2. This could easily be supported
>>>> <https://github.com/kiorky/croniter/pull/46#issuecomment-838544908> in
>>>> croniter.
>>>>
>>>> 3. The trading days only (or business days, etc) problem is handled in
>>>> other systems using a calendar option. It might be that an optional keyword
>>>> argument "calendar" could be used to automatically skip days that are not
>>>> included in the calendar – the default calendar would include all days. If
>>>> `calendar` is some calendar object, `~calendar` could be the inverse,
>>>> allowing a DAG to be easily scheduled on holidays. A requirement to
>>>> schedule on the nth day of the calendar (e.g. 10th business day of the
>>>> month) could be implemented using a derived calendar
>>>> `calendar.nth_day_of_month(10)` which would further filter down the number
>>>> of included days based on an existing calendar.
>>>>
>>>> 4. The cron- vs data scheduling seems to come down to whether the dag
>>>> run is kicked off at the start of the period or immediately after. This
>>>> could be handled using an optional keyword argument "execution_plan" which
>>>> defaults to INTERVAL_END (the current behavior), but can be optionally set
>>>> to INTERVAL_START. The "execution_date" column then remains unchanged, but
>>>> the actual dag run time will be vary according to which execution plan was
>>>> specified.
>>>>
>>>> Cheers
>>>>
>>>> On Fri, 12 Mar 2021 at 07:02, Xinbin Huang <[email protected]>
>>>> wrote:
>>>>
>>>>> I agree with Tomek.
>>>>>
>>>>> TBH, *Timetable *to me does seem to be a complex concept, and I can't
>>>>> quite understand what it is at first sight.
>>>>>
>>>>> I think *Schedule *does convey the message better - consider
>>>>> the sentence: "*the Scheduler arranges jobs to run based on some
>>>>> _______**.*" Here, *"schedules" *seem to fit better than
>>>>> *"timetables"*
>>>>>
>>>>> As for search results on schedule vs scheduler, I wonder if
>>>>> reorganizing the docs to have `schedule` in the top section and have
>>>>> `scheduler` under operation::architecture will help with the search
>>>>> result?
>>>>> (I don't know much about SEO)
>>>>>
>>>>> Nevertheless, the naming shouldn't be a blocker for this feature to
>>>>> move forward.
>>>>>
>>>>> Best
>>>>> Bin
>>>>>
>>>>> On Thu, Mar 11, 2021 at 4:31 PM Tomasz Urbaszek <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> > Timetable is a synonym of ’Schedule’
>>>>>>
>>>>>> I'm not a native speaker and I don't get it easily as a synonym.
>>>>>>
>>>>>> To be honest the "timetable" sounds like a complex piece of software.
>>>>>> Personally I experienced that people use schedule and
>>>>>> schedule_interval interchangeably. Additionally, schedule being more
>>>>>> linked
>>>>>> to scheduler imho is an advantage because it suggests some connection
>>>>>> between these two.
>>>>>>
>>>>>> I feel that by introducing timetable we will bring yet more
>>>>>> complexity to airflow vocabulary. And I personally would treat it as yet
>>>>>> another moving part of an already complex system.
>>>>>>
>>>>>> I think we can move forward with this feature. We renamed "functional
>>>>>> DAGs" to "Taskflow API" so, naming is not a blocker. If we can't get
>>>>>> consensus we can always ask the users - they will use the feature.
>>>>>>
>>>>>> Best,
>>>>>> Tomek
>>>>>>
>>>>>>
>>>>>> On Fri, 12 Mar 2021 at 01:15, James Timmins <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> *Timetable vs Schedule*
>>>>>>> Re Timetable. I agree that if this was a greenfield project, it
>>>>>>> might make sense to use Schedule. But as it stands, we need to find the
>>>>>>> right balance between the most specific name and being sufficiently
>>>>>>> unique
>>>>>>> that it’s easy to work with in code and, perhaps most importantly, easy
>>>>>>> to
>>>>>>> find when searching on Google and in the Airflow Docs.
>>>>>>>
>>>>>>> There are more than 10,000 references to `schedule*` in the Airflow
>>>>>>> codebase. `schedule` and `scheduler` are also identical to most search
>>>>>>> engines/libraries, since they have the same stem, `schedule`. This means
>>>>>>> that when a user Googles `Airflow Schedule`, they will get back
>>>>>>> intermixed
>>>>>>> results of the Schedule class and the Scheduler.
>>>>>>>
>>>>>>> Timetable is a synonym of ’Schedule’, so it passes the accuracy
>>>>>>> test, won’t ever be ambiguous in code, and is distinct in search
>>>>>>> results.
>>>>>>>
>>>>>>> *Should "interval-less DAGs” have data_interval_start and end
>>>>>>> available in the context?*
>>>>>>> I think they should be present so it’s consistent across DAGs. Let’s
>>>>>>> not make users think too hard about what values are available in what
>>>>>>> context. What if someone sets the interval to 0? What if sometimes the
>>>>>>> interval is 0, and sometimes it’s 1 hour? Rather than changing the rules
>>>>>>> depending on usage, it’s easiest to have one rule that the users can
>>>>>>> depend
>>>>>>> upon.
>>>>>>>
>>>>>>> *Re set_context_variables()*
>>>>>>> What context is being defined here? The code comment says "Update or
>>>>>>> set new context variables to become available in task templates and
>>>>>>> operators.” The Timetable seems like the wrong place for variables that
>>>>>>> will get passed into task templates/operators, unless this is actually a
>>>>>>> way to pass Airflow macros into the Timetable context. In which case I
>>>>>>> fully support this. If not, we may want to add this functionality.
>>>>>>>
>>>>>>> James
>>>>>>> On Mar 11, 2021, 9:40 AM -0800, Kaxil Naik <[email protected]>,
>>>>>>> wrote:
>>>>>>>
>>>>>>> Yup I have no strong opinions on either so happy to keep it
>>>>>>> TimeTable or if there is another suggestion.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Kaxil
>>>>>>>
>>>>>>> On Thu, Mar 11, 2021 at 5:00 PM James Timmins <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Respectfully, I strongly disagree with the renaming of Timetable to
>>>>>>>> Schedule. Schedule and Scheduler aren't meaningfully different, which
>>>>>>>> can
>>>>>>>> lead to a lot of confusion. Even as a native English speaker, and
>>>>>>>> someone
>>>>>>>> who works on Airflow full time, I routinely need to ask for
>>>>>>>> clarification
>>>>>>>> about what schedule-related concept someone is referring to. I foresee
>>>>>>>> Schedule and Scheduler as two distinct yet closely related concepts
>>>>>>>> becoming a major source of confusion.
>>>>>>>>
>>>>>>>> If folks dislike Timetable, we could certainly change to something
>>>>>>>> else, but let's not use something so similar to existing Airflow
>>>>>>>> classes.
>>>>>>>>
>>>>>>>> -James
>>>>>>>>
>>>>>>>> On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Summary of changes so far on the AIP:
>>>>>>>>>
>>>>>>>>> My proposed rename of DagRun.execution_date is now
>>>>>>>>> DagRun.schedule_date (previously I had proposed run_date. Thanks
>>>>>>>>> dstandish!)
>>>>>>>>>
>>>>>>>>> Timetable classes are renamed to Schedule classes (CronSchedule
>>>>>>>>> etc), similarly the DAG argument is now schedule (reminder:
>>>>>>>>> schedule_interval will not be removed or deprecated, and will still
>>>>>>>>> be the
>>>>>>>>> way to use "simple" expressions)
>>>>>>>>>
>>>>>>>>> -ash
>>>>>>>>>
>>>>>>>>> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Could change Timetable To Schedule -- that would mean the DAG arg
>>>>>>>>> becomes `schedule=CronSchedule(...)` -- a bit close to the current
>>>>>>>>> `schedule_interval` but I think clear enough difference.
>>>>>>>>>
>>>>>>>>> I do like the name but my one worry with "schedule" is that
>>>>>>>>> Scheduler and Schedule are very similar, and might be be confused
>>>>>>>>> with each
>>>>>>>>> other for non-native English speakers? (I defer to others' judgment
>>>>>>>>> here,
>>>>>>>>> as this is not something I can experience myself.)
>>>>>>>>>
>>>>>>>>> @Kevin Yang <[email protected]> @Daniel Standish
>>>>>>>>> <[email protected]> any final input on this AIP?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Ash and all,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What do people think of this? Worth it? Too complex to reason
>>>>>>>>>> about what context variables might exist as a result?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I think I wouldn't worry about it right now or maybe not as part
>>>>>>>>> of this AIP. Currently, in one of the Github Issue, a user mentioned
>>>>>>>>> that
>>>>>>>>> it is not straightforward to know what is inside the context
>>>>>>>>> dictionary-
>>>>>>>>> https://github.com/apache/airflow/issues/14396. So maybe we can
>>>>>>>>> tackle this issue separately once the AbstractTimetable is built.
>>>>>>>>>
>>>>>>>>> Should "interval-less DAGs" (ones using "CronTimetable" in my
>>>>>>>>>> proposal vs "DataTimetable") have data_interval_start and end
>>>>>>>>>> available in
>>>>>>>>>> the context?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> hmm.. I would say No but then it contradicts my suggestion to
>>>>>>>>> remove context dict from this AIP. If we are going to use it in
>>>>>>>>> scheduler
>>>>>>>>> then yes, where data_interval_start = data_interval_end from
>>>>>>>>> CronTimetable.
>>>>>>>>>
>>>>>>>>> Does anyone have any better names than TimeDeltaTimetable,
>>>>>>>>>> DataTimetable, and CronTimetable? (We can probably change these
>>>>>>>>>> names right
>>>>>>>>>> up until release, so not important to get this correct *now*.)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> No strong opinion here. Just an alternate suggestion can
>>>>>>>>> be TimeDeltaSchedule, DataSchedule and CronSchedule
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Should I try to roll AIP-30
>>>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>>>>>>>>> in
>>>>>>>>>> to this, or should we make that a future addition? (My vote is for
>>>>>>>>>> future
>>>>>>>>>> addition)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I would vote for Future addition too.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Kaxil
>>>>>>>>>
>>>>>>>>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I think, yes, AIP-35 or something like it would happily co-exist
>>>>>>>>>> with this proposal.
>>>>>>>>>>
>>>>>>>>>> @Daniel <[email protected]> and I have been discussing this a
>>>>>>>>>> bit on Slack, and one of the questions he asked was if the concept of
>>>>>>>>>> data_interval should be moved from DagRun as James and I suggested
>>>>>>>>>> down on
>>>>>>>>>> to the individual task:
>>>>>>>>>>
>>>>>>>>>> suppose i have a new dag hitting 5 api endpoints and pulling data
>>>>>>>>>> to s3. suppose that yesterday 4 of them succeeded but one failed.
>>>>>>>>>> today, 4
>>>>>>>>>> of them should pull from yesterday. but the one that failed should
>>>>>>>>>> pull
>>>>>>>>>> from 2 days back. so even though these normally have the same
>>>>>>>>>> interval,
>>>>>>>>>> today they should not.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> My view on this is two fold: one, this should primarily be
>>>>>>>>>> handled by retries on the task, and secondly, having different
>>>>>>>>>> TaskIstances
>>>>>>>>>> in the same DagRun have different data intervals would be much
>>>>>>>>>> harder to
>>>>>>>>>> reason about/design the UI around, so for those reasons I still think
>>>>>>>>>> interval should be a DagRun-level concept.
>>>>>>>>>>
>>>>>>>>>> (He has a stalled AIP-30 where he proposed something to address
>>>>>>>>>> this kind of "watermark" case, which we might pick up next after
>>>>>>>>>> this AIP
>>>>>>>>>> is complete)
>>>>>>>>>>
>>>>>>>>>> One thing we might want to do is extend the interface of
>>>>>>>>>> AbstractTimetable to be able to add/update parameters in the context
>>>>>>>>>> dict,
>>>>>>>>>> so the interface could become this:
>>>>>>>>>>
>>>>>>>>>> class AbstractTimetable(ABC):
>>>>>>>>>> @abstractmethod
>>>>>>>>>> def next_dagrun_info(
>>>>>>>>>> date_last_automated_dagrun: Optional[pendulum.DateTime],
>>>>>>>>>>
>>>>>>>>>> session: Session,
>>>>>>>>>> ) -> Optional[DagRunInfo]:
>>>>>>>>>> """
>>>>>>>>>> Get information about the next DagRun of this dag after
>>>>>>>>>> ``date_last_automated_dagrun`` -- the
>>>>>>>>>> execution date, and the earliest it could be scheduled
>>>>>>>>>>
>>>>>>>>>> :param date_last_automated_dagrun: The
>>>>>>>>>> max(execution_date) of existing
>>>>>>>>>> "automated" DagRuns for this dag (scheduled or
>>>>>>>>>> backfill, but not
>>>>>>>>>> manual)
>>>>>>>>>> """
>>>>>>>>>>
>>>>>>>>>> @abstractmethod
>>>>>>>>>> def set_context_variables(self, dagrun: DagRun, context: Dict
>>>>>>>>>> [str, Any]) -> None:
>>>>>>>>>> """
>>>>>>>>>> Update or set new context variables to become available
>>>>>>>>>> in task templates and operators.
>>>>>>>>>> """
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> What do people think of this? Worth it? Too complex to reason
>>>>>>>>>> about what context variables might exist as a result?
>>>>>>>>>>
>>>>>>>>>> *Outstanding question*:
>>>>>>>>>>
>>>>>>>>>> - Should "interval-less DAGs" (ones using "CronTimetable" in
>>>>>>>>>> my proposal vs "DataTimetable") have data_interval_start and end
>>>>>>>>>> available
>>>>>>>>>> in the context?
>>>>>>>>>> - Does anyone have any better names than TimeDeltaTimetable,
>>>>>>>>>> DataTimetable, and CronTimetable? (We can probably change these
>>>>>>>>>> names right
>>>>>>>>>> up until release, so not important to get this correct *now*.)
>>>>>>>>>> - Should I try to roll AIP-30
>>>>>>>>>>
>>>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>>>>>>>>> in to this, or should we make that a future addition? (My vote is
>>>>>>>>>> for
>>>>>>>>>> future addition)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I'd like to start voting on this AIP next week (probably on
>>>>>>>>>> Tuesday) as I think this will be a powerful feature that eases
>>>>>>>>>> confusing to
>>>>>>>>>> new users.
>>>>>>>>>>
>>>>>>>>>> -Ash
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based
>>>>>>>>>> Scheduling To Airflow"?
>>>>>>>>>> I think streaming was also discussed there (though it wasn't
>>>>>>>>>> really the use case).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <[email protected]>:
>>>>>>>>>>
>>>>>>>>>> Hi Kevin,
>>>>>>>>>>
>>>>>>>>>> Interesting idea. My original idea was actually for
>>>>>>>>>> "interval-less DAGs" (i.e. ones where it's just "run at this time")
>>>>>>>>>> would
>>>>>>>>>> not have data_interval_start or end, but (while drafting the AIP) we
>>>>>>>>>> decided that it was probably "easier" if those values were always
>>>>>>>>>> datetimes.
>>>>>>>>>>
>>>>>>>>>> That said, I think having the DB model have those values be
>>>>>>>>>> nullable would future proof it without needing another migration to
>>>>>>>>>> change
>>>>>>>>>> it. Do you think this is worth doing now?
>>>>>>>>>>
>>>>>>>>>> I haven't (yet! It's on my list) spent any significant time
>>>>>>>>>> thinking about how to make Airflow play nicely with streaming jobs.
>>>>>>>>>> If
>>>>>>>>>> anyone else has ideas here please share them
>>>>>>>>>>
>>>>>>>>>> -ash
>>>>>>>>>>
>>>>>>>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Ash and James,
>>>>>>>>>>
>>>>>>>>>> This is an exciting move. What do you think about using this
>>>>>>>>>> opportunity to extend Airflow's support to streaming like use cases?
>>>>>>>>>> I.e
>>>>>>>>>> DAGs/tasks that want to run forever like a service. For such use
>>>>>>>>>> cases,
>>>>>>>>>> schedule interval might not be meaningful, then do we want to make
>>>>>>>>>> the date
>>>>>>>>>> interval param optional to DagRun and task instances? That sounds
>>>>>>>>>> like a
>>>>>>>>>> pretty major change to the underlying model of Airflow, but this AIP
>>>>>>>>>> is so
>>>>>>>>>> far the best opportunity I saw that can level up Airflow's support
>>>>>>>>>> for
>>>>>>>>>> streaming/service use cases.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Kevin Y
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>> Very excited to see this proposal come through and love the
>>>>>>>>>> direction this has gone.
>>>>>>>>>>
>>>>>>>>>> Couple comments...
>>>>>>>>>>
>>>>>>>>>> *Tree view / Data completeness view*
>>>>>>>>>>
>>>>>>>>>> When you design your tasks with the canonical idempotence
>>>>>>>>>> pattern, the tree view shows you both data completeness and task
>>>>>>>>>> execution
>>>>>>>>>> history (success / failure etc).
>>>>>>>>>>
>>>>>>>>>> When you don't use that pattern (which is my general preference),
>>>>>>>>>> tree view is only task execution history.
>>>>>>>>>>
>>>>>>>>>> This change has the potential to unlock a data completeness view
>>>>>>>>>> for canonical tasks. It's possible that the "data completeness
>>>>>>>>>> view" can
>>>>>>>>>> simply be the tree view. I.e. somehow it can use these new classes
>>>>>>>>>> to know
>>>>>>>>>> what data was successfully filled and what data wasn't.
>>>>>>>>>>
>>>>>>>>>> To the extent we like the idea of either extending / plugging /
>>>>>>>>>> modifying tree view, or adding a distinct data completeness view, we
>>>>>>>>>> might
>>>>>>>>>> want to anticipate the needs of that in this change. And maybe no
>>>>>>>>>> alteration to the proposal would be needed but just want to throw
>>>>>>>>>> the idea
>>>>>>>>>> out there.
>>>>>>>>>>
>>>>>>>>>> *Watermark workflow / incremental processing*
>>>>>>>>>>
>>>>>>>>>> A common pattern in data warehousing is pulling data
>>>>>>>>>> incrementally from a source.
>>>>>>>>>>
>>>>>>>>>> A standard way to achieve this is at the start of the task,
>>>>>>>>>> select max `updated_at` in source table and hold on to that value
>>>>>>>>>> for a
>>>>>>>>>> minute. This is your tentative new high watermark.
>>>>>>>>>> Now it's time to pull your data. If your task ran before, grab
>>>>>>>>>> last high watermark. If not, use initial load value.
>>>>>>>>>> If successful, update high watermark.
>>>>>>>>>>
>>>>>>>>>> On my team we implemented this with a stateful tasks / stateful
>>>>>>>>>> processes concept (there's a dormant draft AIP here
>>>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
>>>>>>>>>> and a WatermarkOperator that handled the boilerplate*.
>>>>>>>>>>
>>>>>>>>>> Again here, I don't have a specific suggestion at this moment.
>>>>>>>>>> But I wanted to articulate this workflow because it is common and it
>>>>>>>>>> wasn't
>>>>>>>>>> immediately obvious to me in reading AIP-39 how I would use it to
>>>>>>>>>> implement
>>>>>>>>>> it.
>>>>>>>>>>
>>>>>>>>>> AIP-39 makes airflow more data-aware. So if it can support this
>>>>>>>>>> kind of workflow that's great. @Ash Berlin-Taylor
>>>>>>>>>> <[email protected]> do you have thoughts on how it might be
>>>>>>>>>> compatible with this kind of thing as is?
>>>>>>>>>>
>>>>>>>>>> ---
>>>>>>>>>>
>>>>>>>>>> * The base operator is designed so that Subclasses only need to
>>>>>>>>>> implement two methods:
>>>>>>>>>> - `get_high_watermark`: produce the tentative new high
>>>>>>>>>> watermark
>>>>>>>>>> ' `watermark_execute`: analogous to implementing poke in a
>>>>>>>>>> sensor, this is where your work is done. `execute` is left to the
>>>>>>>>>> base
>>>>>>>>>> class, and it orchestrates (1) getting last high watermark or inital
>>>>>>>>>> load
>>>>>>>>>> value and (2) updating new high watermark if job successful.
>>>>>>>>>>
>>>>>>>>>>
>>>
>>> --
>>> +48 660 796 129
>>>
>>
>