When it comes to scheduling, Airflow does take a rather declarative approach I would say, but it is certainly correct that it very much stops there.
I appreciate the arguments favoring a more object-oriented design, but I do think that adding a couple of additional scheduling options could go a very long way in terms of providing that extra bit of scheduling flexibility – while preserving the "scripting ergonomics". The current proposal leaves most of the interesting use-cases on the table rather than aiming to show that the abstraction actually meets the requirements. Cheers On Thu, 13 May 2021 at 15:01, Kaxil Naik <[email protected]> wrote: > And also the proposed items with Timetables are more "extensible" too -- > Users can develop some classes for their own use and create a library for > reusing it. > > Using arguments like you are proposing @malthe -- it can be difficult to > understand on all the "related" arguments to understand the scheduling / > schedule_interval. > > On Thu, May 13, 2021 at 3:46 PM Jarek Potiuk <[email protected]> wrote: > >> I much more on Ash's proposal with this one. I think we do not want to >> optimize for a number of changes but instead we want to make sure what we >> come up with is an easy and natural to use long-term solution. Even if it >> departs a lot from what we have now, a few months (or maybe years) after >> it becomes mainstream nobody will remember the "old way" hopefully. >> >> The idea of "pythonic" pluggable schedule rather than "declarative" way >> goes perfectly in-line with the whole concept of Airflow where DAGs are >> defined as Python code rather than declaratively. So making schedules >> follow the same approach seems very natural for anyone who uses Airflow. >> >> >> J. >> >> >> On Thu, May 13, 2021 at 9:27 AM Malthe <[email protected]> wrote: >> >>> I'm a bit late to the discussion, but it might be interesting to explore >>> a more simple approach, cutting back on the number of changes. >>> >>> As a general remark, "execution_date" may be a slightly difficult >>> concept to grasp, but changing it now is perhaps counterproductive. >>> >>> From the AIP examples: >>> >>> 1. The MON-FRI problem could be handled using an optional keyword >>> argument "execution_interval" which defaults to `None` (meaning automatic – >>> this is the current behavior). But instead a `timedelta` could be provided, >>> i.e. `timedelta(days=1)`. >>> >>> 2. This could easily be supported >>> <https://github.com/kiorky/croniter/pull/46#issuecomment-838544908> in >>> croniter. >>> >>> 3. The trading days only (or business days, etc) problem is handled in >>> other systems using a calendar option. It might be that an optional keyword >>> argument "calendar" could be used to automatically skip days that are not >>> included in the calendar – the default calendar would include all days. If >>> `calendar` is some calendar object, `~calendar` could be the inverse, >>> allowing a DAG to be easily scheduled on holidays. A requirement to >>> schedule on the nth day of the calendar (e.g. 10th business day of the >>> month) could be implemented using a derived calendar >>> `calendar.nth_day_of_month(10)` which would further filter down the number >>> of included days based on an existing calendar. >>> >>> 4. The cron- vs data scheduling seems to come down to whether the dag >>> run is kicked off at the start of the period or immediately after. This >>> could be handled using an optional keyword argument "execution_plan" which >>> defaults to INTERVAL_END (the current behavior), but can be optionally set >>> to INTERVAL_START. The "execution_date" column then remains unchanged, but >>> the actual dag run time will be vary according to which execution plan was >>> specified. >>> >>> Cheers >>> >>> On Fri, 12 Mar 2021 at 07:02, Xinbin Huang <[email protected]> >>> wrote: >>> >>>> I agree with Tomek. >>>> >>>> TBH, *Timetable *to me does seem to be a complex concept, and I can't >>>> quite understand what it is at first sight. >>>> >>>> I think *Schedule *does convey the message better - consider >>>> the sentence: "*the Scheduler arranges jobs to run based on some >>>> _______**.*" Here, *"schedules" *seem to fit better than >>>> *"timetables"* >>>> >>>> As for search results on schedule vs scheduler, I wonder if >>>> reorganizing the docs to have `schedule` in the top section and have >>>> `scheduler` under operation::architecture will help with the search result? >>>> (I don't know much about SEO) >>>> >>>> Nevertheless, the naming shouldn't be a blocker for this feature to >>>> move forward. >>>> >>>> Best >>>> Bin >>>> >>>> On Thu, Mar 11, 2021 at 4:31 PM Tomasz Urbaszek <[email protected]> >>>> wrote: >>>> >>>>> > Timetable is a synonym of ’Schedule’ >>>>> >>>>> I'm not a native speaker and I don't get it easily as a synonym. >>>>> >>>>> To be honest the "timetable" sounds like a complex piece of software. >>>>> Personally I experienced that people use schedule and >>>>> schedule_interval interchangeably. Additionally, schedule being more >>>>> linked >>>>> to scheduler imho is an advantage because it suggests some connection >>>>> between these two. >>>>> >>>>> I feel that by introducing timetable we will bring yet more complexity >>>>> to airflow vocabulary. And I personally would treat it as yet another >>>>> moving part of an already complex system. >>>>> >>>>> I think we can move forward with this feature. We renamed "functional >>>>> DAGs" to "Taskflow API" so, naming is not a blocker. If we can't get >>>>> consensus we can always ask the users - they will use the feature. >>>>> >>>>> Best, >>>>> Tomek >>>>> >>>>> >>>>> On Fri, 12 Mar 2021 at 01:15, James Timmins >>>>> <[email protected]> wrote: >>>>> >>>>>> *Timetable vs Schedule* >>>>>> Re Timetable. I agree that if this was a greenfield project, it might >>>>>> make sense to use Schedule. But as it stands, we need to find the right >>>>>> balance between the most specific name and being sufficiently unique that >>>>>> it’s easy to work with in code and, perhaps most importantly, easy to >>>>>> find >>>>>> when searching on Google and in the Airflow Docs. >>>>>> >>>>>> There are more than 10,000 references to `schedule*` in the Airflow >>>>>> codebase. `schedule` and `scheduler` are also identical to most search >>>>>> engines/libraries, since they have the same stem, `schedule`. This means >>>>>> that when a user Googles `Airflow Schedule`, they will get back >>>>>> intermixed >>>>>> results of the Schedule class and the Scheduler. >>>>>> >>>>>> Timetable is a synonym of ’Schedule’, so it passes the accuracy test, >>>>>> won’t ever be ambiguous in code, and is distinct in search results. >>>>>> >>>>>> *Should "interval-less DAGs” have data_interval_start and end >>>>>> available in the context?* >>>>>> I think they should be present so it’s consistent across DAGs. Let’s >>>>>> not make users think too hard about what values are available in what >>>>>> context. What if someone sets the interval to 0? What if sometimes the >>>>>> interval is 0, and sometimes it’s 1 hour? Rather than changing the rules >>>>>> depending on usage, it’s easiest to have one rule that the users can >>>>>> depend >>>>>> upon. >>>>>> >>>>>> *Re set_context_variables()* >>>>>> What context is being defined here? The code comment says "Update or >>>>>> set new context variables to become available in task templates and >>>>>> operators.” The Timetable seems like the wrong place for variables that >>>>>> will get passed into task templates/operators, unless this is actually a >>>>>> way to pass Airflow macros into the Timetable context. In which case I >>>>>> fully support this. If not, we may want to add this functionality. >>>>>> >>>>>> James >>>>>> On Mar 11, 2021, 9:40 AM -0800, Kaxil Naik <[email protected]>, >>>>>> wrote: >>>>>> >>>>>> Yup I have no strong opinions on either so happy to keep it TimeTable >>>>>> or if there is another suggestion. >>>>>> >>>>>> Regards, >>>>>> Kaxil >>>>>> >>>>>> On Thu, Mar 11, 2021 at 5:00 PM James Timmins >>>>>> <[email protected]> wrote: >>>>>> >>>>>>> Respectfully, I strongly disagree with the renaming of Timetable to >>>>>>> Schedule. Schedule and Scheduler aren't meaningfully different, which >>>>>>> can >>>>>>> lead to a lot of confusion. Even as a native English speaker, and >>>>>>> someone >>>>>>> who works on Airflow full time, I routinely need to ask for >>>>>>> clarification >>>>>>> about what schedule-related concept someone is referring to. I foresee >>>>>>> Schedule and Scheduler as two distinct yet closely related concepts >>>>>>> becoming a major source of confusion. >>>>>>> >>>>>>> If folks dislike Timetable, we could certainly change to something >>>>>>> else, but let's not use something so similar to existing Airflow >>>>>>> classes. >>>>>>> >>>>>>> -James >>>>>>> >>>>>>> On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Summary of changes so far on the AIP: >>>>>>>> >>>>>>>> My proposed rename of DagRun.execution_date is now >>>>>>>> DagRun.schedule_date (previously I had proposed run_date. Thanks >>>>>>>> dstandish!) >>>>>>>> >>>>>>>> Timetable classes are renamed to Schedule classes (CronSchedule >>>>>>>> etc), similarly the DAG argument is now schedule (reminder: >>>>>>>> schedule_interval will not be removed or deprecated, and will still be >>>>>>>> the >>>>>>>> way to use "simple" expressions) >>>>>>>> >>>>>>>> -ash >>>>>>>> >>>>>>>> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Could change Timetable To Schedule -- that would mean the DAG arg >>>>>>>> becomes `schedule=CronSchedule(...)` -- a bit close to the current >>>>>>>> `schedule_interval` but I think clear enough difference. >>>>>>>> >>>>>>>> I do like the name but my one worry with "schedule" is that >>>>>>>> Scheduler and Schedule are very similar, and might be be confused with >>>>>>>> each >>>>>>>> other for non-native English speakers? (I defer to others' judgment >>>>>>>> here, >>>>>>>> as this is not something I can experience myself.) >>>>>>>> >>>>>>>> @Kevin Yang <[email protected]> @Daniel Standish >>>>>>>> <[email protected]> any final input on this AIP? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Hi Ash and all, >>>>>>>> >>>>>>>> >>>>>>>> What do people think of this? Worth it? Too complex to reason about >>>>>>>>> what context variables might exist as a result? >>>>>>>> >>>>>>>> >>>>>>>> I think I wouldn't worry about it right now or maybe not as part of >>>>>>>> this AIP. Currently, in one of the Github Issue, a user mentioned that >>>>>>>> it >>>>>>>> is not straightforward to know what is inside the context dictionary- >>>>>>>> https://github.com/apache/airflow/issues/14396. So maybe we can >>>>>>>> tackle this issue separately once the AbstractTimetable is built. >>>>>>>> >>>>>>>> Should "interval-less DAGs" (ones using "CronTimetable" in my >>>>>>>>> proposal vs "DataTimetable") have data_interval_start and end >>>>>>>>> available in >>>>>>>>> the context? >>>>>>>> >>>>>>>> >>>>>>>> hmm.. I would say No but then it contradicts my suggestion to >>>>>>>> remove context dict from this AIP. If we are going to use it in >>>>>>>> scheduler >>>>>>>> then yes, where data_interval_start = data_interval_end from >>>>>>>> CronTimetable. >>>>>>>> >>>>>>>> Does anyone have any better names than TimeDeltaTimetable, >>>>>>>>> DataTimetable, and CronTimetable? (We can probably change these names >>>>>>>>> right >>>>>>>>> up until release, so not important to get this correct *now*.) >>>>>>>> >>>>>>>> >>>>>>>> No strong opinion here. Just an alternate suggestion can >>>>>>>> be TimeDeltaSchedule, DataSchedule and CronSchedule >>>>>>>> >>>>>>>> >>>>>>>>> Should I try to roll AIP-30 >>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> >>>>>>>>> in >>>>>>>>> to this, or should we make that a future addition? (My vote is for >>>>>>>>> future >>>>>>>>> addition) >>>>>>>> >>>>>>>> >>>>>>>> I would vote for Future addition too. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Kaxil >>>>>>>> >>>>>>>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I think, yes, AIP-35 or something like it would happily co-exist >>>>>>>>> with this proposal. >>>>>>>>> >>>>>>>>> @Daniel <[email protected]> and I have been discussing this a >>>>>>>>> bit on Slack, and one of the questions he asked was if the concept of >>>>>>>>> data_interval should be moved from DagRun as James and I suggested >>>>>>>>> down on >>>>>>>>> to the individual task: >>>>>>>>> >>>>>>>>> suppose i have a new dag hitting 5 api endpoints and pulling data >>>>>>>>> to s3. suppose that yesterday 4 of them succeeded but one failed. >>>>>>>>> today, 4 >>>>>>>>> of them should pull from yesterday. but the one that failed should >>>>>>>>> pull >>>>>>>>> from 2 days back. so even though these normally have the same >>>>>>>>> interval, >>>>>>>>> today they should not. >>>>>>>>> >>>>>>>>> >>>>>>>>> My view on this is two fold: one, this should primarily be handled >>>>>>>>> by retries on the task, and secondly, having different TaskIstances >>>>>>>>> in the >>>>>>>>> same DagRun have different data intervals would be much harder to >>>>>>>>> reason >>>>>>>>> about/design the UI around, so for those reasons I still think >>>>>>>>> interval >>>>>>>>> should be a DagRun-level concept. >>>>>>>>> >>>>>>>>> (He has a stalled AIP-30 where he proposed something to address >>>>>>>>> this kind of "watermark" case, which we might pick up next after this >>>>>>>>> AIP >>>>>>>>> is complete) >>>>>>>>> >>>>>>>>> One thing we might want to do is extend the interface of >>>>>>>>> AbstractTimetable to be able to add/update parameters in the context >>>>>>>>> dict, >>>>>>>>> so the interface could become this: >>>>>>>>> >>>>>>>>> class AbstractTimetable(ABC): >>>>>>>>> @abstractmethod >>>>>>>>> def next_dagrun_info( >>>>>>>>> date_last_automated_dagrun: Optional[pendulum.DateTime], >>>>>>>>> >>>>>>>>> session: Session, >>>>>>>>> ) -> Optional[DagRunInfo]: >>>>>>>>> """ >>>>>>>>> Get information about the next DagRun of this dag after >>>>>>>>> ``date_last_automated_dagrun`` -- the >>>>>>>>> execution date, and the earliest it could be scheduled >>>>>>>>> >>>>>>>>> :param date_last_automated_dagrun: The >>>>>>>>> max(execution_date) of existing >>>>>>>>> "automated" DagRuns for this dag (scheduled or >>>>>>>>> backfill, but not >>>>>>>>> manual) >>>>>>>>> """ >>>>>>>>> >>>>>>>>> @abstractmethod >>>>>>>>> def set_context_variables(self, dagrun: DagRun, context: Dict[ >>>>>>>>> str, Any]) -> None: >>>>>>>>> """ >>>>>>>>> Update or set new context variables to become available >>>>>>>>> in task templates and operators. >>>>>>>>> """ >>>>>>>>> >>>>>>>>> >>>>>>>>> What do people think of this? Worth it? Too complex to reason >>>>>>>>> about what context variables might exist as a result? >>>>>>>>> >>>>>>>>> *Outstanding question*: >>>>>>>>> >>>>>>>>> - Should "interval-less DAGs" (ones using "CronTimetable" in >>>>>>>>> my proposal vs "DataTimetable") have data_interval_start and end >>>>>>>>> available >>>>>>>>> in the context? >>>>>>>>> - Does anyone have any better names than TimeDeltaTimetable, >>>>>>>>> DataTimetable, and CronTimetable? (We can probably change these >>>>>>>>> names right >>>>>>>>> up until release, so not important to get this correct *now*.) >>>>>>>>> - Should I try to roll AIP-30 >>>>>>>>> >>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> >>>>>>>>> in to this, or should we make that a future addition? (My vote is >>>>>>>>> for >>>>>>>>> future addition) >>>>>>>>> >>>>>>>>> >>>>>>>>> I'd like to start voting on this AIP next week (probably on >>>>>>>>> Tuesday) as I think this will be a powerful feature that eases >>>>>>>>> confusing to >>>>>>>>> new users. >>>>>>>>> >>>>>>>>> -Ash >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based >>>>>>>>> Scheduling To Airflow"? >>>>>>>>> I think streaming was also discussed there (though it wasn't >>>>>>>>> really the use case). >>>>>>>>> >>>>>>>>> >>>>>>>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <[email protected]>: >>>>>>>>> >>>>>>>>> Hi Kevin, >>>>>>>>> >>>>>>>>> Interesting idea. My original idea was actually for "interval-less >>>>>>>>> DAGs" (i.e. ones where it's just "run at this time") would not have >>>>>>>>> data_interval_start or end, but (while drafting the AIP) we decided >>>>>>>>> that it >>>>>>>>> was probably "easier" if those values were always datetimes. >>>>>>>>> >>>>>>>>> That said, I think having the DB model have those values be >>>>>>>>> nullable would future proof it without needing another migration to >>>>>>>>> change >>>>>>>>> it. Do you think this is worth doing now? >>>>>>>>> >>>>>>>>> I haven't (yet! It's on my list) spent any significant time >>>>>>>>> thinking about how to make Airflow play nicely with streaming jobs. If >>>>>>>>> anyone else has ideas here please share them >>>>>>>>> >>>>>>>>> -ash >>>>>>>>> >>>>>>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi Ash and James, >>>>>>>>> >>>>>>>>> This is an exciting move. What do you think about using this >>>>>>>>> opportunity to extend Airflow's support to streaming like use cases? >>>>>>>>> I.e >>>>>>>>> DAGs/tasks that want to run forever like a service. For such use >>>>>>>>> cases, >>>>>>>>> schedule interval might not be meaningful, then do we want to make >>>>>>>>> the date >>>>>>>>> interval param optional to DagRun and task instances? That sounds >>>>>>>>> like a >>>>>>>>> pretty major change to the underlying model of Airflow, but this AIP >>>>>>>>> is so >>>>>>>>> far the best opportunity I saw that can level up Airflow's support for >>>>>>>>> streaming/service use cases. >>>>>>>>> >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Kevin Y >>>>>>>>> >>>>>>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>> Very excited to see this proposal come through and love the >>>>>>>>> direction this has gone. >>>>>>>>> >>>>>>>>> Couple comments... >>>>>>>>> >>>>>>>>> *Tree view / Data completeness view* >>>>>>>>> >>>>>>>>> When you design your tasks with the canonical idempotence pattern, >>>>>>>>> the tree view shows you both data completeness and task execution >>>>>>>>> history >>>>>>>>> (success / failure etc). >>>>>>>>> >>>>>>>>> When you don't use that pattern (which is my general preference), >>>>>>>>> tree view is only task execution history. >>>>>>>>> >>>>>>>>> This change has the potential to unlock a data completeness view >>>>>>>>> for canonical tasks. It's possible that the "data completeness view" >>>>>>>>> can >>>>>>>>> simply be the tree view. I.e. somehow it can use these new classes >>>>>>>>> to know >>>>>>>>> what data was successfully filled and what data wasn't. >>>>>>>>> >>>>>>>>> To the extent we like the idea of either extending / plugging / >>>>>>>>> modifying tree view, or adding a distinct data completeness view, we >>>>>>>>> might >>>>>>>>> want to anticipate the needs of that in this change. And maybe no >>>>>>>>> alteration to the proposal would be needed but just want to throw the >>>>>>>>> idea >>>>>>>>> out there. >>>>>>>>> >>>>>>>>> *Watermark workflow / incremental processing* >>>>>>>>> >>>>>>>>> A common pattern in data warehousing is pulling data incrementally >>>>>>>>> from a source. >>>>>>>>> >>>>>>>>> A standard way to achieve this is at the start of the task, select >>>>>>>>> max `updated_at` in source table and hold on to that value for a >>>>>>>>> minute. >>>>>>>>> This is your tentative new high watermark. >>>>>>>>> Now it's time to pull your data. If your task ran before, grab >>>>>>>>> last high watermark. If not, use initial load value. >>>>>>>>> If successful, update high watermark. >>>>>>>>> >>>>>>>>> On my team we implemented this with a stateful tasks / stateful >>>>>>>>> processes concept (there's a dormant draft AIP here >>>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>) >>>>>>>>> and a WatermarkOperator that handled the boilerplate*. >>>>>>>>> >>>>>>>>> Again here, I don't have a specific suggestion at this moment. >>>>>>>>> But I wanted to articulate this workflow because it is common and it >>>>>>>>> wasn't >>>>>>>>> immediately obvious to me in reading AIP-39 how I would use it to >>>>>>>>> implement >>>>>>>>> it. >>>>>>>>> >>>>>>>>> AIP-39 makes airflow more data-aware. So if it can support this >>>>>>>>> kind of workflow that's great. @Ash Berlin-Taylor >>>>>>>>> <[email protected]> do you have thoughts on how it might be >>>>>>>>> compatible with this kind of thing as is? >>>>>>>>> >>>>>>>>> --- >>>>>>>>> >>>>>>>>> * The base operator is designed so that Subclasses only need to >>>>>>>>> implement two methods: >>>>>>>>> - `get_high_watermark`: produce the tentative new high >>>>>>>>> watermark >>>>>>>>> ' `watermark_execute`: analogous to implementing poke in a >>>>>>>>> sensor, this is where your work is done. `execute` is left to the base >>>>>>>>> class, and it orchestrates (1) getting last high watermark or inital >>>>>>>>> load >>>>>>>>> value and (2) updating new high watermark if job successful. >>>>>>>>> >>>>>>>>> >> >> -- >> +48 660 796 129 >> >
