And also the proposed items with Timetables are more "extensible" too -- Users can develop some classes for their own use and create a library for reusing it.
Using arguments like you are proposing @malthe -- it can be difficult to understand on all the "related" arguments to understand the scheduling / schedule_interval. On Thu, May 13, 2021 at 3:46 PM Jarek Potiuk <[email protected]> wrote: > I much more on Ash's proposal with this one. I think we do not want to > optimize for a number of changes but instead we want to make sure what we > come up with is an easy and natural to use long-term solution. Even if it > departs a lot from what we have now, a few months (or maybe years) after > it becomes mainstream nobody will remember the "old way" hopefully. > > The idea of "pythonic" pluggable schedule rather than "declarative" way > goes perfectly in-line with the whole concept of Airflow where DAGs are > defined as Python code rather than declaratively. So making schedules > follow the same approach seems very natural for anyone who uses Airflow. > > > J. > > > On Thu, May 13, 2021 at 9:27 AM Malthe <[email protected]> wrote: > >> I'm a bit late to the discussion, but it might be interesting to explore >> a more simple approach, cutting back on the number of changes. >> >> As a general remark, "execution_date" may be a slightly difficult concept >> to grasp, but changing it now is perhaps counterproductive. >> >> From the AIP examples: >> >> 1. The MON-FRI problem could be handled using an optional keyword >> argument "execution_interval" which defaults to `None` (meaning automatic – >> this is the current behavior). But instead a `timedelta` could be provided, >> i.e. `timedelta(days=1)`. >> >> 2. This could easily be supported >> <https://github.com/kiorky/croniter/pull/46#issuecomment-838544908> in >> croniter. >> >> 3. The trading days only (or business days, etc) problem is handled in >> other systems using a calendar option. It might be that an optional keyword >> argument "calendar" could be used to automatically skip days that are not >> included in the calendar – the default calendar would include all days. If >> `calendar` is some calendar object, `~calendar` could be the inverse, >> allowing a DAG to be easily scheduled on holidays. A requirement to >> schedule on the nth day of the calendar (e.g. 10th business day of the >> month) could be implemented using a derived calendar >> `calendar.nth_day_of_month(10)` which would further filter down the number >> of included days based on an existing calendar. >> >> 4. The cron- vs data scheduling seems to come down to whether the dag run >> is kicked off at the start of the period or immediately after. This could >> be handled using an optional keyword argument "execution_plan" which >> defaults to INTERVAL_END (the current behavior), but can be optionally set >> to INTERVAL_START. The "execution_date" column then remains unchanged, but >> the actual dag run time will be vary according to which execution plan was >> specified. >> >> Cheers >> >> On Fri, 12 Mar 2021 at 07:02, Xinbin Huang <[email protected]> wrote: >> >>> I agree with Tomek. >>> >>> TBH, *Timetable *to me does seem to be a complex concept, and I can't >>> quite understand what it is at first sight. >>> >>> I think *Schedule *does convey the message better - consider >>> the sentence: "*the Scheduler arranges jobs to run based on some >>> _______**.*" Here, *"schedules" *seem to fit better than *"timetables"* >>> >>> As for search results on schedule vs scheduler, I wonder if reorganizing >>> the docs to have `schedule` in the top section and have `scheduler` under >>> operation::architecture will help with the search result? (I don't know >>> much about SEO) >>> >>> Nevertheless, the naming shouldn't be a blocker for this feature to move >>> forward. >>> >>> Best >>> Bin >>> >>> On Thu, Mar 11, 2021 at 4:31 PM Tomasz Urbaszek <[email protected]> >>> wrote: >>> >>>> > Timetable is a synonym of ’Schedule’ >>>> >>>> I'm not a native speaker and I don't get it easily as a synonym. >>>> >>>> To be honest the "timetable" sounds like a complex piece of software. >>>> Personally I experienced that people use schedule and >>>> schedule_interval interchangeably. Additionally, schedule being more linked >>>> to scheduler imho is an advantage because it suggests some connection >>>> between these two. >>>> >>>> I feel that by introducing timetable we will bring yet more complexity >>>> to airflow vocabulary. And I personally would treat it as yet another >>>> moving part of an already complex system. >>>> >>>> I think we can move forward with this feature. We renamed "functional >>>> DAGs" to "Taskflow API" so, naming is not a blocker. If we can't get >>>> consensus we can always ask the users - they will use the feature. >>>> >>>> Best, >>>> Tomek >>>> >>>> >>>> On Fri, 12 Mar 2021 at 01:15, James Timmins <[email protected]> >>>> wrote: >>>> >>>>> *Timetable vs Schedule* >>>>> Re Timetable. I agree that if this was a greenfield project, it might >>>>> make sense to use Schedule. But as it stands, we need to find the right >>>>> balance between the most specific name and being sufficiently unique that >>>>> it’s easy to work with in code and, perhaps most importantly, easy to find >>>>> when searching on Google and in the Airflow Docs. >>>>> >>>>> There are more than 10,000 references to `schedule*` in the Airflow >>>>> codebase. `schedule` and `scheduler` are also identical to most search >>>>> engines/libraries, since they have the same stem, `schedule`. This means >>>>> that when a user Googles `Airflow Schedule`, they will get back intermixed >>>>> results of the Schedule class and the Scheduler. >>>>> >>>>> Timetable is a synonym of ’Schedule’, so it passes the accuracy test, >>>>> won’t ever be ambiguous in code, and is distinct in search results. >>>>> >>>>> *Should "interval-less DAGs” have data_interval_start and end >>>>> available in the context?* >>>>> I think they should be present so it’s consistent across DAGs. Let’s >>>>> not make users think too hard about what values are available in what >>>>> context. What if someone sets the interval to 0? What if sometimes the >>>>> interval is 0, and sometimes it’s 1 hour? Rather than changing the rules >>>>> depending on usage, it’s easiest to have one rule that the users can >>>>> depend >>>>> upon. >>>>> >>>>> *Re set_context_variables()* >>>>> What context is being defined here? The code comment says "Update or >>>>> set new context variables to become available in task templates and >>>>> operators.” The Timetable seems like the wrong place for variables that >>>>> will get passed into task templates/operators, unless this is actually a >>>>> way to pass Airflow macros into the Timetable context. In which case I >>>>> fully support this. If not, we may want to add this functionality. >>>>> >>>>> James >>>>> On Mar 11, 2021, 9:40 AM -0800, Kaxil Naik <[email protected]>, >>>>> wrote: >>>>> >>>>> Yup I have no strong opinions on either so happy to keep it TimeTable >>>>> or if there is another suggestion. >>>>> >>>>> Regards, >>>>> Kaxil >>>>> >>>>> On Thu, Mar 11, 2021 at 5:00 PM James Timmins >>>>> <[email protected]> wrote: >>>>> >>>>>> Respectfully, I strongly disagree with the renaming of Timetable to >>>>>> Schedule. Schedule and Scheduler aren't meaningfully different, which can >>>>>> lead to a lot of confusion. Even as a native English speaker, and someone >>>>>> who works on Airflow full time, I routinely need to ask for clarification >>>>>> about what schedule-related concept someone is referring to. I foresee >>>>>> Schedule and Scheduler as two distinct yet closely related concepts >>>>>> becoming a major source of confusion. >>>>>> >>>>>> If folks dislike Timetable, we could certainly change to something >>>>>> else, but let's not use something so similar to existing Airflow classes. >>>>>> >>>>>> -James >>>>>> >>>>>> On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Summary of changes so far on the AIP: >>>>>>> >>>>>>> My proposed rename of DagRun.execution_date is now >>>>>>> DagRun.schedule_date (previously I had proposed run_date. Thanks >>>>>>> dstandish!) >>>>>>> >>>>>>> Timetable classes are renamed to Schedule classes (CronSchedule >>>>>>> etc), similarly the DAG argument is now schedule (reminder: >>>>>>> schedule_interval will not be removed or deprecated, and will still be >>>>>>> the >>>>>>> way to use "simple" expressions) >>>>>>> >>>>>>> -ash >>>>>>> >>>>>>> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>> Could change Timetable To Schedule -- that would mean the DAG arg >>>>>>> becomes `schedule=CronSchedule(...)` -- a bit close to the current >>>>>>> `schedule_interval` but I think clear enough difference. >>>>>>> >>>>>>> I do like the name but my one worry with "schedule" is that >>>>>>> Scheduler and Schedule are very similar, and might be be confused with >>>>>>> each >>>>>>> other for non-native English speakers? (I defer to others' judgment >>>>>>> here, >>>>>>> as this is not something I can experience myself.) >>>>>>> >>>>>>> @Kevin Yang <[email protected]> @Daniel Standish >>>>>>> <[email protected]> any final input on this AIP? >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>> Hi Ash and all, >>>>>>> >>>>>>> >>>>>>> What do people think of this? Worth it? Too complex to reason about >>>>>>>> what context variables might exist as a result? >>>>>>> >>>>>>> >>>>>>> I think I wouldn't worry about it right now or maybe not as part of >>>>>>> this AIP. Currently, in one of the Github Issue, a user mentioned that >>>>>>> it >>>>>>> is not straightforward to know what is inside the context dictionary- >>>>>>> https://github.com/apache/airflow/issues/14396. So maybe we can >>>>>>> tackle this issue separately once the AbstractTimetable is built. >>>>>>> >>>>>>> Should "interval-less DAGs" (ones using "CronTimetable" in my >>>>>>>> proposal vs "DataTimetable") have data_interval_start and end >>>>>>>> available in >>>>>>>> the context? >>>>>>> >>>>>>> >>>>>>> hmm.. I would say No but then it contradicts my suggestion to remove >>>>>>> context dict from this AIP. If we are going to use it in scheduler then >>>>>>> yes, where data_interval_start = data_interval_end from CronTimetable. >>>>>>> >>>>>>> Does anyone have any better names than TimeDeltaTimetable, >>>>>>>> DataTimetable, and CronTimetable? (We can probably change these names >>>>>>>> right >>>>>>>> up until release, so not important to get this correct *now*.) >>>>>>> >>>>>>> >>>>>>> No strong opinion here. Just an alternate suggestion can >>>>>>> be TimeDeltaSchedule, DataSchedule and CronSchedule >>>>>>> >>>>>>> >>>>>>>> Should I try to roll AIP-30 >>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> >>>>>>>> in >>>>>>>> to this, or should we make that a future addition? (My vote is for >>>>>>>> future >>>>>>>> addition) >>>>>>> >>>>>>> >>>>>>> I would vote for Future addition too. >>>>>>> >>>>>>> Regards, >>>>>>> Kaxil >>>>>>> >>>>>>> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> I think, yes, AIP-35 or something like it would happily co-exist >>>>>>>> with this proposal. >>>>>>>> >>>>>>>> @Daniel <[email protected]> and I have been discussing this a >>>>>>>> bit on Slack, and one of the questions he asked was if the concept of >>>>>>>> data_interval should be moved from DagRun as James and I suggested >>>>>>>> down on >>>>>>>> to the individual task: >>>>>>>> >>>>>>>> suppose i have a new dag hitting 5 api endpoints and pulling data >>>>>>>> to s3. suppose that yesterday 4 of them succeeded but one failed. >>>>>>>> today, 4 >>>>>>>> of them should pull from yesterday. but the one that failed should pull >>>>>>>> from 2 days back. so even though these normally have the same interval, >>>>>>>> today they should not. >>>>>>>> >>>>>>>> >>>>>>>> My view on this is two fold: one, this should primarily be handled >>>>>>>> by retries on the task, and secondly, having different TaskIstances in >>>>>>>> the >>>>>>>> same DagRun have different data intervals would be much harder to >>>>>>>> reason >>>>>>>> about/design the UI around, so for those reasons I still think interval >>>>>>>> should be a DagRun-level concept. >>>>>>>> >>>>>>>> (He has a stalled AIP-30 where he proposed something to address >>>>>>>> this kind of "watermark" case, which we might pick up next after this >>>>>>>> AIP >>>>>>>> is complete) >>>>>>>> >>>>>>>> One thing we might want to do is extend the interface of >>>>>>>> AbstractTimetable to be able to add/update parameters in the context >>>>>>>> dict, >>>>>>>> so the interface could become this: >>>>>>>> >>>>>>>> class AbstractTimetable(ABC): >>>>>>>> @abstractmethod >>>>>>>> def next_dagrun_info( >>>>>>>> date_last_automated_dagrun: Optional[pendulum.DateTime], >>>>>>>> >>>>>>>> session: Session, >>>>>>>> ) -> Optional[DagRunInfo]: >>>>>>>> """ >>>>>>>> Get information about the next DagRun of this dag after >>>>>>>> ``date_last_automated_dagrun`` -- the >>>>>>>> execution date, and the earliest it could be scheduled >>>>>>>> >>>>>>>> :param date_last_automated_dagrun: The max(execution_date) >>>>>>>> of existing >>>>>>>> "automated" DagRuns for this dag (scheduled or >>>>>>>> backfill, but not >>>>>>>> manual) >>>>>>>> """ >>>>>>>> >>>>>>>> @abstractmethod >>>>>>>> def set_context_variables(self, dagrun: DagRun, context: Dict[ >>>>>>>> str, Any]) -> None: >>>>>>>> """ >>>>>>>> Update or set new context variables to become available in >>>>>>>> task templates and operators. >>>>>>>> """ >>>>>>>> >>>>>>>> >>>>>>>> What do people think of this? Worth it? Too complex to reason about >>>>>>>> what context variables might exist as a result? >>>>>>>> >>>>>>>> *Outstanding question*: >>>>>>>> >>>>>>>> - Should "interval-less DAGs" (ones using "CronTimetable" in my >>>>>>>> proposal vs "DataTimetable") have data_interval_start and end >>>>>>>> available in >>>>>>>> the context? >>>>>>>> - Does anyone have any better names than TimeDeltaTimetable, >>>>>>>> DataTimetable, and CronTimetable? (We can probably change these >>>>>>>> names right >>>>>>>> up until release, so not important to get this correct *now*.) >>>>>>>> - Should I try to roll AIP-30 >>>>>>>> >>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> >>>>>>>> in to this, or should we make that a future addition? (My vote is >>>>>>>> for >>>>>>>> future addition) >>>>>>>> >>>>>>>> >>>>>>>> I'd like to start voting on this AIP next week (probably on >>>>>>>> Tuesday) as I think this will be a powerful feature that eases >>>>>>>> confusing to >>>>>>>> new users. >>>>>>>> >>>>>>>> -Ash >>>>>>>> >>>>>>>> >>>>>>>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Is this AIP going to co-exist with AIP-35 "Add Signal Based >>>>>>>> Scheduling To Airflow"? >>>>>>>> I think streaming was also discussed there (though it wasn't really >>>>>>>> the use case). >>>>>>>> >>>>>>>> >>>>>>>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <[email protected]>: >>>>>>>> >>>>>>>> Hi Kevin, >>>>>>>> >>>>>>>> Interesting idea. My original idea was actually for "interval-less >>>>>>>> DAGs" (i.e. ones where it's just "run at this time") would not have >>>>>>>> data_interval_start or end, but (while drafting the AIP) we decided >>>>>>>> that it >>>>>>>> was probably "easier" if those values were always datetimes. >>>>>>>> >>>>>>>> That said, I think having the DB model have those values be >>>>>>>> nullable would future proof it without needing another migration to >>>>>>>> change >>>>>>>> it. Do you think this is worth doing now? >>>>>>>> >>>>>>>> I haven't (yet! It's on my list) spent any significant time >>>>>>>> thinking about how to make Airflow play nicely with streaming jobs. If >>>>>>>> anyone else has ideas here please share them >>>>>>>> >>>>>>>> -ash >>>>>>>> >>>>>>>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Hi Ash and James, >>>>>>>> >>>>>>>> This is an exciting move. What do you think about using this >>>>>>>> opportunity to extend Airflow's support to streaming like use cases? >>>>>>>> I.e >>>>>>>> DAGs/tasks that want to run forever like a service. For such use cases, >>>>>>>> schedule interval might not be meaningful, then do we want to make the >>>>>>>> date >>>>>>>> interval param optional to DagRun and task instances? That sounds like >>>>>>>> a >>>>>>>> pretty major change to the underlying model of Airflow, but this AIP >>>>>>>> is so >>>>>>>> far the best opportunity I saw that can level up Airflow's support for >>>>>>>> streaming/service use cases. >>>>>>>> >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Kevin Y >>>>>>>> >>>>>>>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>> Very excited to see this proposal come through and love the >>>>>>>> direction this has gone. >>>>>>>> >>>>>>>> Couple comments... >>>>>>>> >>>>>>>> *Tree view / Data completeness view* >>>>>>>> >>>>>>>> When you design your tasks with the canonical idempotence pattern, >>>>>>>> the tree view shows you both data completeness and task execution >>>>>>>> history >>>>>>>> (success / failure etc). >>>>>>>> >>>>>>>> When you don't use that pattern (which is my general preference), >>>>>>>> tree view is only task execution history. >>>>>>>> >>>>>>>> This change has the potential to unlock a data completeness view >>>>>>>> for canonical tasks. It's possible that the "data completeness view" >>>>>>>> can >>>>>>>> simply be the tree view. I.e. somehow it can use these new classes to >>>>>>>> know >>>>>>>> what data was successfully filled and what data wasn't. >>>>>>>> >>>>>>>> To the extent we like the idea of either extending / plugging / >>>>>>>> modifying tree view, or adding a distinct data completeness view, we >>>>>>>> might >>>>>>>> want to anticipate the needs of that in this change. And maybe no >>>>>>>> alteration to the proposal would be needed but just want to throw the >>>>>>>> idea >>>>>>>> out there. >>>>>>>> >>>>>>>> *Watermark workflow / incremental processing* >>>>>>>> >>>>>>>> A common pattern in data warehousing is pulling data incrementally >>>>>>>> from a source. >>>>>>>> >>>>>>>> A standard way to achieve this is at the start of the task, select >>>>>>>> max `updated_at` in source table and hold on to that value for a >>>>>>>> minute. >>>>>>>> This is your tentative new high watermark. >>>>>>>> Now it's time to pull your data. If your task ran before, grab >>>>>>>> last high watermark. If not, use initial load value. >>>>>>>> If successful, update high watermark. >>>>>>>> >>>>>>>> On my team we implemented this with a stateful tasks / stateful >>>>>>>> processes concept (there's a dormant draft AIP here >>>>>>>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>) >>>>>>>> and a WatermarkOperator that handled the boilerplate*. >>>>>>>> >>>>>>>> Again here, I don't have a specific suggestion at this moment. But >>>>>>>> I wanted to articulate this workflow because it is common and it wasn't >>>>>>>> immediately obvious to me in reading AIP-39 how I would use it to >>>>>>>> implement >>>>>>>> it. >>>>>>>> >>>>>>>> AIP-39 makes airflow more data-aware. So if it can support this >>>>>>>> kind of workflow that's great. @Ash Berlin-Taylor >>>>>>>> <[email protected]> do you have thoughts on how it might be >>>>>>>> compatible with this kind of thing as is? >>>>>>>> >>>>>>>> --- >>>>>>>> >>>>>>>> * The base operator is designed so that Subclasses only need to >>>>>>>> implement two methods: >>>>>>>> - `get_high_watermark`: produce the tentative new high watermark >>>>>>>> ' `watermark_execute`: analogous to implementing poke in a >>>>>>>> sensor, this is where your work is done. `execute` is left to the base >>>>>>>> class, and it orchestrates (1) getting last high watermark or inital >>>>>>>> load >>>>>>>> value and (2) updating new high watermark if job successful. >>>>>>>> >>>>>>>> > > -- > +48 660 796 129 >
