When it comes to scheduling, Airflow does take a rather declarative
approach I would say, but it is certainly correct that it very much stops
there.

I appreciate the arguments favoring a more object-oriented design, but I do
think that adding a couple of additional scheduling options could go a very
long way in terms of providing that extra bit of scheduling flexibility –
while preserving the "scripting ergonomics".

The current proposal leaves most of the interesting use-cases on the table
rather than aiming to show that the abstraction actually meets the
requirements.

Cheers

On Thu, 13 May 2021 at 15:01, Kaxil Naik <[email protected]> wrote:

> And also the proposed items with Timetables are more "extensible" too --
> Users can develop some classes for their own use and create a library for
> reusing it.
>
> Using arguments like you are proposing @malthe -- it can be difficult to
> understand on all the "related" arguments to understand the scheduling /
> schedule_interval.
>
> On Thu, May 13, 2021 at 3:46 PM Jarek Potiuk <[email protected]> wrote:
>
>> I much more on Ash's proposal with this one. I think we do not want to
>> optimize for a number of changes but instead we want to make sure what we
>> come up with is an  easy and natural to use long-term solution. Even if it
>> departs  a lot from what we have now, a few months (or maybe years) after
>> it becomes mainstream nobody will remember the "old way" hopefully.
>>
>> The idea of "pythonic" pluggable schedule rather than "declarative" way
>> goes perfectly in-line with the whole concept of Airflow where DAGs are
>> defined as Python code rather than declaratively. So making schedules
>> follow the same approach seems very natural for anyone who uses Airflow.
>>
>>
>> J.
>>
>>
>> On Thu, May 13, 2021 at 9:27 AM Malthe <[email protected]> wrote:
>>
>>> I'm a bit late to the discussion, but it might be interesting to explore
>>> a more simple approach, cutting back on the number of changes.
>>>
>>> As a general remark, "execution_date" may be a slightly difficult
>>> concept to grasp, but changing it now is perhaps counterproductive.
>>>
>>> From the AIP examples:
>>>
>>> 1. The MON-FRI problem could be handled using an optional keyword
>>> argument "execution_interval" which defaults to `None` (meaning automatic –
>>> this is the current behavior). But instead a `timedelta` could be provided,
>>> i.e. `timedelta(days=1)`.
>>>
>>> 2. This could easily be supported
>>> <https://github.com/kiorky/croniter/pull/46#issuecomment-838544908> in
>>> croniter.
>>>
>>> 3. The trading days only (or business days, etc) problem is handled in
>>> other systems using a calendar option. It might be that an optional keyword
>>> argument "calendar" could be used to automatically skip days that are not
>>> included in the calendar – the default calendar would include all days. If
>>> `calendar` is some calendar object, `~calendar` could be the inverse,
>>> allowing a DAG to be easily scheduled on holidays. A requirement to
>>> schedule on the nth day of the calendar (e.g. 10th business day of the
>>> month) could be implemented using a derived calendar
>>> `calendar.nth_day_of_month(10)` which would further filter down the number
>>> of included days based on an existing calendar.
>>>
>>> 4. The cron- vs data scheduling seems to come down to whether the dag
>>> run is kicked off at the start of the period or immediately after. This
>>> could be handled using an optional keyword argument "execution_plan" which
>>> defaults to INTERVAL_END (the current behavior), but can be optionally set
>>> to INTERVAL_START. The "execution_date" column then remains unchanged, but
>>> the actual dag run time will be vary according to which execution plan was
>>> specified.
>>>
>>> Cheers
>>>
>>> On Fri, 12 Mar 2021 at 07:02, Xinbin Huang <[email protected]>
>>> wrote:
>>>
>>>> I agree with Tomek.
>>>>
>>>> TBH, *Timetable *to me does seem to be a complex concept, and I can't
>>>> quite understand what it is at first sight.
>>>>
>>>> I think *Schedule *does convey the message better - consider
>>>> the sentence: "*the Scheduler arranges jobs to run based on some
>>>> _______**.*"  Here, *"schedules" *seem to fit better than
>>>> *"timetables"*
>>>>
>>>> As for search results on schedule vs scheduler, I wonder if
>>>> reorganizing the docs to have `schedule` in the top section and have
>>>> `scheduler` under operation::architecture will help with the search result?
>>>> (I don't know much about SEO)
>>>>
>>>> Nevertheless, the naming shouldn't be a blocker for this feature to
>>>> move forward.
>>>>
>>>> Best
>>>> Bin
>>>>
>>>> On Thu, Mar 11, 2021 at 4:31 PM Tomasz Urbaszek <[email protected]>
>>>> wrote:
>>>>
>>>>> > Timetable is a synonym of ’Schedule’
>>>>>
>>>>> I'm not a native speaker and I don't get it easily as a synonym.
>>>>>
>>>>> To be honest the "timetable" sounds like a complex piece of software.
>>>>> Personally I experienced that people use schedule and
>>>>> schedule_interval interchangeably. Additionally, schedule being more 
>>>>> linked
>>>>> to scheduler imho is an advantage because it suggests some connection
>>>>> between these two.
>>>>>
>>>>> I feel that by introducing timetable we will bring yet more complexity
>>>>> to airflow vocabulary. And I personally would treat it as yet another
>>>>> moving part of an already complex system.
>>>>>
>>>>> I think we can move forward with this feature. We renamed "functional
>>>>> DAGs" to "Taskflow API" so, naming is not a blocker. If we can't get
>>>>> consensus we can always ask the users - they will use the feature.
>>>>>
>>>>> Best,
>>>>> Tomek
>>>>>
>>>>>
>>>>> On Fri, 12 Mar 2021 at 01:15, James Timmins
>>>>> <[email protected]> wrote:
>>>>>
>>>>>> *Timetable vs Schedule*
>>>>>> Re Timetable. I agree that if this was a greenfield project, it might
>>>>>> make sense to use Schedule. But as it stands, we need to find the right
>>>>>> balance between the most specific name and being sufficiently unique that
>>>>>> it’s easy to work with in code and, perhaps most importantly, easy to 
>>>>>> find
>>>>>> when searching on Google and in the Airflow Docs.
>>>>>>
>>>>>> There are more than 10,000 references to `schedule*` in the Airflow
>>>>>> codebase. `schedule` and `scheduler` are also identical to most search
>>>>>> engines/libraries, since they have the same stem, `schedule`. This means
>>>>>> that when a user Googles `Airflow Schedule`, they will get back 
>>>>>> intermixed
>>>>>> results of the Schedule class and the Scheduler.
>>>>>>
>>>>>> Timetable is a synonym of ’Schedule’, so it passes the accuracy test,
>>>>>> won’t ever be ambiguous in code, and is distinct in search results.
>>>>>>
>>>>>> *Should "interval-less DAGs” have data_interval_start and end
>>>>>> available in the context?*
>>>>>> I think they should be present so it’s consistent across DAGs. Let’s
>>>>>> not make users think too hard about what values are available in what
>>>>>> context. What if someone sets the interval to 0? What if sometimes the
>>>>>> interval is 0, and sometimes it’s 1 hour? Rather than changing the rules
>>>>>> depending on usage, it’s easiest to have one rule that the users can 
>>>>>> depend
>>>>>> upon.
>>>>>>
>>>>>> *Re set_context_variables()*
>>>>>> What context is being defined here? The code comment says "Update or
>>>>>> set new context variables to become available in task templates and
>>>>>> operators.” The Timetable seems like the wrong place for variables that
>>>>>> will get passed into task templates/operators, unless this is actually a
>>>>>> way to pass Airflow macros into the Timetable context. In which case I
>>>>>> fully support this. If not, we may want to add this functionality.
>>>>>>
>>>>>> James
>>>>>> On Mar 11, 2021, 9:40 AM -0800, Kaxil Naik <[email protected]>,
>>>>>> wrote:
>>>>>>
>>>>>> Yup I have no strong opinions on either so happy to keep it TimeTable
>>>>>> or if there is another suggestion.
>>>>>>
>>>>>> Regards,
>>>>>> Kaxil
>>>>>>
>>>>>> On Thu, Mar 11, 2021 at 5:00 PM James Timmins
>>>>>> <[email protected]> wrote:
>>>>>>
>>>>>>> Respectfully, I strongly disagree with the renaming of Timetable to
>>>>>>> Schedule. Schedule and Scheduler aren't meaningfully different, which 
>>>>>>> can
>>>>>>> lead to a lot of confusion. Even as a native English speaker, and 
>>>>>>> someone
>>>>>>> who works on Airflow full time, I routinely need to ask for 
>>>>>>> clarification
>>>>>>> about what schedule-related concept someone is referring to. I foresee
>>>>>>> Schedule and Scheduler as two distinct yet closely related concepts
>>>>>>> becoming a major source of confusion.
>>>>>>>
>>>>>>> If folks dislike Timetable, we could certainly change to something
>>>>>>> else, but let's not use something so similar to existing Airflow 
>>>>>>> classes.
>>>>>>>
>>>>>>> -James
>>>>>>>
>>>>>>> On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Summary of changes so far on the AIP:
>>>>>>>>
>>>>>>>> My proposed rename of DagRun.execution_date is now
>>>>>>>> DagRun.schedule_date (previously I had proposed run_date. Thanks 
>>>>>>>> dstandish!)
>>>>>>>>
>>>>>>>> Timetable classes are renamed to Schedule classes (CronSchedule
>>>>>>>> etc), similarly the DAG argument is now schedule (reminder:
>>>>>>>> schedule_interval will not be removed or deprecated, and will still be 
>>>>>>>> the
>>>>>>>> way to use "simple" expressions)
>>>>>>>>
>>>>>>>> -ash
>>>>>>>>
>>>>>>>> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Could change Timetable To Schedule -- that would mean the DAG arg
>>>>>>>> becomes `schedule=CronSchedule(...)` -- a bit close to the current
>>>>>>>> `schedule_interval` but I think clear enough difference.
>>>>>>>>
>>>>>>>> I do like the name but my one worry with "schedule" is that
>>>>>>>> Scheduler and Schedule are very similar, and might be be confused with 
>>>>>>>> each
>>>>>>>> other for non-native English speakers? (I defer to others' judgment 
>>>>>>>> here,
>>>>>>>> as this is not something I can experience myself.)
>>>>>>>>
>>>>>>>> @Kevin Yang <[email protected]> @Daniel Standish
>>>>>>>> <[email protected]> any final input on this AIP?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Ash and all,
>>>>>>>>
>>>>>>>>
>>>>>>>> What do people think of this? Worth it? Too complex to reason about
>>>>>>>>> what context variables might exist as a result?
>>>>>>>>
>>>>>>>>
>>>>>>>> I think I wouldn't worry about it right now or maybe not as part of
>>>>>>>> this AIP. Currently, in one of the Github Issue, a user mentioned that 
>>>>>>>> it
>>>>>>>> is not straightforward to know what is inside the context dictionary-
>>>>>>>> https://github.com/apache/airflow/issues/14396. So maybe we can
>>>>>>>> tackle this issue separately once the AbstractTimetable is built.
>>>>>>>>
>>>>>>>> Should "interval-less DAGs" (ones using "CronTimetable" in my
>>>>>>>>> proposal vs "DataTimetable") have data_interval_start and end 
>>>>>>>>> available in
>>>>>>>>> the context?
>>>>>>>>
>>>>>>>>
>>>>>>>> hmm.. I would say No but then it contradicts my suggestion to
>>>>>>>> remove context dict from this AIP. If we are going to use it in 
>>>>>>>> scheduler
>>>>>>>> then yes, where data_interval_start = data_interval_end from 
>>>>>>>> CronTimetable.
>>>>>>>>
>>>>>>>> Does anyone have any better names than TimeDeltaTimetable,
>>>>>>>>> DataTimetable, and CronTimetable? (We can probably change these names 
>>>>>>>>> right
>>>>>>>>> up until release, so not important to get this correct *now*.)
>>>>>>>>
>>>>>>>>
>>>>>>>> No strong opinion here. Just an alternate suggestion can
>>>>>>>> be TimeDeltaSchedule, DataSchedule and CronSchedule
>>>>>>>>
>>>>>>>>
>>>>>>>>> Should I try to roll AIP-30
>>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>>>>>>>>  in
>>>>>>>>> to this, or should we make that a future addition? (My vote is for 
>>>>>>>>> future
>>>>>>>>> addition)
>>>>>>>>
>>>>>>>>
>>>>>>>> I would vote for Future addition too.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Kaxil
>>>>>>>>
>>>>>>>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think, yes, AIP-35 or something like it would happily co-exist
>>>>>>>>> with this proposal.
>>>>>>>>>
>>>>>>>>> @Daniel <[email protected]> and I have been discussing this a
>>>>>>>>> bit on Slack, and one of the questions he asked was if the concept of
>>>>>>>>> data_interval should be moved from DagRun as James and I suggested 
>>>>>>>>> down on
>>>>>>>>> to the individual task:
>>>>>>>>>
>>>>>>>>> suppose i have a new dag hitting 5 api endpoints and pulling data
>>>>>>>>> to s3. suppose that yesterday 4 of them succeeded but one failed. 
>>>>>>>>> today, 4
>>>>>>>>> of them should pull from yesterday. but the one that failed should 
>>>>>>>>> pull
>>>>>>>>> from 2 days back. so even though these normally have the same 
>>>>>>>>> interval,
>>>>>>>>> today they should not.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> My view on this is two fold: one, this should primarily be handled
>>>>>>>>> by retries on the task, and secondly, having different TaskIstances 
>>>>>>>>> in the
>>>>>>>>> same DagRun  have different data intervals would be much harder to 
>>>>>>>>> reason
>>>>>>>>> about/design the UI around, so for those reasons I still think 
>>>>>>>>> interval
>>>>>>>>> should be a DagRun-level concept.
>>>>>>>>>
>>>>>>>>> (He has a stalled AIP-30 where he proposed something to address
>>>>>>>>> this kind of "watermark" case, which we might pick up next after this 
>>>>>>>>> AIP
>>>>>>>>> is complete)
>>>>>>>>>
>>>>>>>>> One thing we might want to do is extend the interface of
>>>>>>>>> AbstractTimetable to be able to add/update parameters in the context 
>>>>>>>>> dict,
>>>>>>>>> so the interface could become this:
>>>>>>>>>
>>>>>>>>> class AbstractTimetable(ABC):
>>>>>>>>>     @abstractmethod
>>>>>>>>>     def next_dagrun_info(
>>>>>>>>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>>>>>>>>>
>>>>>>>>>         session: Session,
>>>>>>>>>     ) -> Optional[DagRunInfo]:
>>>>>>>>>         """
>>>>>>>>>         Get information about the next DagRun of this dag after
>>>>>>>>> ``date_last_automated_dagrun`` -- the
>>>>>>>>>         execution date, and the earliest it could be scheduled
>>>>>>>>>
>>>>>>>>>         :param date_last_automated_dagrun: The
>>>>>>>>> max(execution_date) of existing
>>>>>>>>>             "automated" DagRuns for this dag (scheduled or
>>>>>>>>> backfill, but not
>>>>>>>>>             manual)
>>>>>>>>>         """
>>>>>>>>>
>>>>>>>>>     @abstractmethod
>>>>>>>>>     def set_context_variables(self, dagrun: DagRun, context: Dict[
>>>>>>>>> str, Any]) -> None:
>>>>>>>>>         """
>>>>>>>>>         Update or set new context variables to become available
>>>>>>>>> in task templates and operators.
>>>>>>>>>         """
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What do people think of this? Worth it? Too complex to reason
>>>>>>>>> about what context variables might exist as a result?
>>>>>>>>>
>>>>>>>>> *Outstanding question*:
>>>>>>>>>
>>>>>>>>>    - Should "interval-less DAGs" (ones using "CronTimetable" in
>>>>>>>>>    my proposal vs "DataTimetable") have data_interval_start and end 
>>>>>>>>> available
>>>>>>>>>    in the context?
>>>>>>>>>    - Does anyone have any better names than TimeDeltaTimetable,
>>>>>>>>>    DataTimetable, and CronTimetable? (We can probably change these 
>>>>>>>>> names right
>>>>>>>>>    up until release, so not important to get this correct *now*.)
>>>>>>>>>    - Should I try to roll AIP-30
>>>>>>>>>    
>>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>>>>>>>>    in to this, or should we make that a future addition? (My vote is 
>>>>>>>>> for
>>>>>>>>>    future addition)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I'd like to start voting on this AIP next week (probably on
>>>>>>>>> Tuesday) as I think this will be a powerful feature that eases 
>>>>>>>>> confusing to
>>>>>>>>> new users.
>>>>>>>>>
>>>>>>>>> -Ash
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based
>>>>>>>>> Scheduling To Airflow"?
>>>>>>>>> I think streaming was also discussed there (though it wasn't
>>>>>>>>> really the use case).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <[email protected]>:
>>>>>>>>>
>>>>>>>>> Hi Kevin,
>>>>>>>>>
>>>>>>>>> Interesting idea. My original idea was actually for "interval-less
>>>>>>>>> DAGs" (i.e. ones where it's just "run at this time") would not have
>>>>>>>>> data_interval_start or end, but (while drafting the AIP) we decided 
>>>>>>>>> that it
>>>>>>>>> was probably "easier" if those values were always datetimes.
>>>>>>>>>
>>>>>>>>> That said, I think having the DB model have those values be
>>>>>>>>> nullable would future proof it without needing another migration to 
>>>>>>>>> change
>>>>>>>>> it. Do you think this is worth doing now?
>>>>>>>>>
>>>>>>>>> I haven't (yet! It's on my list) spent any significant time
>>>>>>>>> thinking about how to make Airflow play nicely with streaming jobs. If
>>>>>>>>> anyone else has ideas here please share them
>>>>>>>>>
>>>>>>>>> -ash
>>>>>>>>>
>>>>>>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Ash and James,
>>>>>>>>>
>>>>>>>>> This is an exciting move. What do you think about using this
>>>>>>>>> opportunity to extend Airflow's support to streaming like use cases? 
>>>>>>>>> I.e
>>>>>>>>> DAGs/tasks that want to run forever like a service. For such use 
>>>>>>>>> cases,
>>>>>>>>> schedule interval might not be meaningful, then do we want to make 
>>>>>>>>> the date
>>>>>>>>> interval param optional to DagRun and task instances? That sounds 
>>>>>>>>> like a
>>>>>>>>> pretty major change to the underlying model of Airflow, but this AIP 
>>>>>>>>> is so
>>>>>>>>> far the best opportunity I saw that can level up Airflow's support for
>>>>>>>>> streaming/service use cases.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Kevin Y
>>>>>>>>>
>>>>>>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>> Very excited to see this proposal come through and love the
>>>>>>>>> direction this has gone.
>>>>>>>>>
>>>>>>>>> Couple comments...
>>>>>>>>>
>>>>>>>>> *Tree view / Data completeness view*
>>>>>>>>>
>>>>>>>>> When you design your tasks with the canonical idempotence pattern,
>>>>>>>>> the tree view shows you both data completeness and task execution 
>>>>>>>>> history
>>>>>>>>> (success / failure etc).
>>>>>>>>>
>>>>>>>>> When you don't use that pattern (which is my general preference),
>>>>>>>>> tree view is only task execution history.
>>>>>>>>>
>>>>>>>>> This change has the potential to unlock a data completeness view
>>>>>>>>> for canonical tasks.  It's possible that the "data completeness view" 
>>>>>>>>> can
>>>>>>>>> simply be the tree view.  I.e. somehow it can use these new classes 
>>>>>>>>> to know
>>>>>>>>> what data was successfully filled and what data wasn't.
>>>>>>>>>
>>>>>>>>> To the extent we like the idea of either extending / plugging /
>>>>>>>>> modifying tree view, or adding a distinct data completeness view, we 
>>>>>>>>> might
>>>>>>>>> want to anticipate the needs of that in this change.  And maybe no
>>>>>>>>> alteration to the proposal would be needed but just want to throw the 
>>>>>>>>> idea
>>>>>>>>> out there.
>>>>>>>>>
>>>>>>>>> *Watermark workflow / incremental processing*
>>>>>>>>>
>>>>>>>>> A common pattern in data warehousing is pulling data incrementally
>>>>>>>>> from a source.
>>>>>>>>>
>>>>>>>>> A standard way to achieve this is at the start of the task, select
>>>>>>>>> max `updated_at` in source table and hold on to that value for a 
>>>>>>>>> minute.
>>>>>>>>> This is your tentative new high watermark.
>>>>>>>>> Now it's time to pull your data.  If your task ran before, grab
>>>>>>>>> last high watermark.  If not, use initial load value.
>>>>>>>>> If successful, update high watermark.
>>>>>>>>>
>>>>>>>>> On my team we implemented this with a stateful tasks / stateful
>>>>>>>>> processes concept (there's a dormant draft AIP here
>>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
>>>>>>>>> and a WatermarkOperator that handled the boilerplate*.
>>>>>>>>>
>>>>>>>>> Again here, I don't have a specific suggestion at this moment.
>>>>>>>>> But I wanted to articulate this workflow because it is common and it 
>>>>>>>>> wasn't
>>>>>>>>> immediately obvious to me in reading AIP-39 how I would use it to 
>>>>>>>>> implement
>>>>>>>>> it.
>>>>>>>>>
>>>>>>>>> AIP-39 makes airflow more data-aware.  So if it can support this
>>>>>>>>> kind of workflow that's great.  @Ash Berlin-Taylor
>>>>>>>>> <[email protected]> do you have thoughts on how it might be
>>>>>>>>> compatible with this kind of thing as is?
>>>>>>>>>
>>>>>>>>> ---
>>>>>>>>>
>>>>>>>>> * The base operator is designed so that Subclasses only need to
>>>>>>>>> implement two methods:
>>>>>>>>>     - `get_high_watermark`: produce the tentative new high
>>>>>>>>> watermark
>>>>>>>>>     ' `watermark_execute`: analogous to implementing poke in a
>>>>>>>>> sensor, this is where your work is done. `execute` is left to the base
>>>>>>>>> class, and it orchestrates (1) getting last high watermark or inital 
>>>>>>>>> load
>>>>>>>>> value and (2) updating new high watermark if job successful.
>>>>>>>>>
>>>>>>>>>
>>
>> --
>> +48 660 796 129
>>
>

Reply via email to