Respectfully, I strongly disagree with the renaming of Timetable to Schedule. Schedule and Scheduler aren't meaningfully different, which can lead to a lot of confusion. Even as a native English speaker, and someone who works on Airflow full time, I routinely need to ask for clarification about what schedule-related concept someone is referring to. I foresee Schedule and Scheduler as two distinct yet closely related concepts becoming a major source of confusion.
If folks dislike Timetable, we could certainly change to something else, but let's not use something so similar to existing Airflow classes. -James On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <[email protected]> wrote: > Summary of changes so far on the AIP: > > My proposed rename of DagRun.execution_date is now DagRun.schedule_date > (previously I had proposed run_date. Thanks dstandish!) > > Timetable classes are renamed to Schedule classes (CronSchedule etc), > similarly the DAG argument is now schedule (reminder: schedule_interval > will not be removed or deprecated, and will still be the way to use > "simple" expressions) > > -ash > > On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <[email protected]> wrote: > > Could change Timetable To Schedule -- that would mean the DAG arg becomes > `schedule=CronSchedule(...)` -- a bit close to the current > `schedule_interval` but I think clear enough difference. > > I do like the name but my one worry with "schedule" is that Scheduler and > Schedule are very similar, and might be be confused with each other for > non-native English speakers? (I defer to others' judgment here, as this is > not something I can experience myself.) > > @Kevin Yang <[email protected]> @Daniel Standish <[email protected]> any > final input on this AIP? > > > > On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <[email protected]> wrote: > > Hi Ash and all, > > > What do people think of this? Worth it? Too complex to reason about what >> context variables might exist as a result? > > > I think I wouldn't worry about it right now or maybe not as part of this > AIP. Currently, in one of the Github Issue, a user mentioned that it is not > straightforward to know what is inside the context dictionary- > https://github.com/apache/airflow/issues/14396. So maybe we can tackle > this issue separately once the AbstractTimetable is built. > > Should "interval-less DAGs" (ones using "CronTimetable" in my proposal vs >> "DataTimetable") have data_interval_start and end available in the context? > > > hmm.. I would say No but then it contradicts my suggestion to remove > context dict from this AIP. If we are going to use it in scheduler then > yes, where data_interval_start = data_interval_end from CronTimetable. > > Does anyone have any better names than TimeDeltaTimetable, DataTimetable, >> and CronTimetable? (We can probably change these names right up until >> release, so not important to get this correct *now*.) > > > No strong opinion here. Just an alternate suggestion can > be TimeDeltaSchedule, DataSchedule and CronSchedule > > >> Should I try to roll AIP-30 >> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> >> in >> to this, or should we make that a future addition? (My vote is for future >> addition) > > > I would vote for Future addition too. > > Regards, > Kaxil > > On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <[email protected]> wrote: > >> I think, yes, AIP-35 or something like it would happily co-exist with >> this proposal. >> >> @Daniel <[email protected]> and I have been discussing this a bit on >> Slack, and one of the questions he asked was if the concept of >> data_interval should be moved from DagRun as James and I suggested down on >> to the individual task: >> >> suppose i have a new dag hitting 5 api endpoints and pulling data to s3. >> suppose that yesterday 4 of them succeeded but one failed. today, 4 of them >> should pull from yesterday. but the one that failed should pull from 2 days >> back. so even though these normally have the same interval, today they >> should not. >> >> >> My view on this is two fold: one, this should primarily be handled by >> retries on the task, and secondly, having different TaskIstances in the >> same DagRun have different data intervals would be much harder to reason >> about/design the UI around, so for those reasons I still think interval >> should be a DagRun-level concept. >> >> (He has a stalled AIP-30 where he proposed something to address this kind >> of "watermark" case, which we might pick up next after this AIP is complete) >> >> One thing we might want to do is extend the interface of >> AbstractTimetable to be able to add/update parameters in the context dict, >> so the interface could become this: >> >> class AbstractTimetable(ABC): >> @abstractmethod >> def next_dagrun_info( >> date_last_automated_dagrun: Optional[pendulum.DateTime], >> >> session: Session, >> ) -> Optional[DagRunInfo]: >> """ >> Get information about the next DagRun of this dag after >> ``date_last_automated_dagrun`` -- the >> execution date, and the earliest it could be scheduled >> >> :param date_last_automated_dagrun: The max(execution_date) of >> existing >> "automated" DagRuns for this dag (scheduled or backfill, but >> not >> manual) >> """ >> >> @abstractmethod >> def set_context_variables(self, dagrun: DagRun, context: Dict[str, >> Any]) -> None: >> """ >> Update or set new context variables to become available in task >> templates and operators. >> """ >> >> >> What do people think of this? Worth it? Too complex to reason about what >> context variables might exist as a result? >> >> *Outstanding question*: >> >> - Should "interval-less DAGs" (ones using "CronTimetable" in my >> proposal vs "DataTimetable") have data_interval_start and end available in >> the context? >> - Does anyone have any better names than TimeDeltaTimetable, >> DataTimetable, and CronTimetable? (We can probably change these names >> right >> up until release, so not important to get this correct *now*.) >> - Should I try to roll AIP-30 >> >> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> >> in to this, or should we make that a future addition? (My vote is for >> future addition) >> >> >> I'd like to start voting on this AIP next week (probably on Tuesday) as I >> think this will be a powerful feature that eases confusing to new users. >> >> -Ash >> >> >> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <[email protected]> wrote: >> >> Is this AIP going to co-exist with AIP-35 "Add Signal Based Scheduling To >> Airflow"? >> I think streaming was also discussed there (though it wasn't really the >> use case). >> >> >> 02.03.2021, 22:10, "Ash Berlin-Taylor" <[email protected]>: >> >> Hi Kevin, >> >> Interesting idea. My original idea was actually for "interval-less DAGs" >> (i.e. ones where it's just "run at this time") would not have >> data_interval_start or end, but (while drafting the AIP) we decided that it >> was probably "easier" if those values were always datetimes. >> >> That said, I think having the DB model have those values be nullable >> would future proof it without needing another migration to change it. Do >> you think this is worth doing now? >> >> I haven't (yet! It's on my list) spent any significant time thinking >> about how to make Airflow play nicely with streaming jobs. If anyone else >> has ideas here please share them >> >> -ash >> >> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <[email protected]> wrote: >> >> Hi Ash and James, >> >> This is an exciting move. What do you think about using this opportunity >> to extend Airflow's support to streaming like use cases? I.e DAGs/tasks >> that want to run forever like a service. For such use cases, schedule >> interval might not be meaningful, then do we want to make the date interval >> param optional to DagRun and task instances? That sounds like a pretty >> major change to the underlying model of Airflow, but this AIP is so far the >> best opportunity I saw that can level up Airflow's support for >> streaming/service use cases. >> >> >> Cheers, >> Kevin Y >> >> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <[email protected]> >> wrote: >> >> Very excited to see this proposal come through and love the direction >> this has gone. >> >> Couple comments... >> >> *Tree view / Data completeness view* >> >> When you design your tasks with the canonical idempotence pattern, the >> tree view shows you both data completeness and task execution history >> (success / failure etc). >> >> When you don't use that pattern (which is my general preference), tree >> view is only task execution history. >> >> This change has the potential to unlock a data completeness view for >> canonical tasks. It's possible that the "data completeness view" can >> simply be the tree view. I.e. somehow it can use these new classes to know >> what data was successfully filled and what data wasn't. >> >> To the extent we like the idea of either extending / plugging / modifying >> tree view, or adding a distinct data completeness view, we might want to >> anticipate the needs of that in this change. And maybe no alteration to >> the proposal would be needed but just want to throw the idea out there. >> >> *Watermark workflow / incremental processing* >> >> A common pattern in data warehousing is pulling data incrementally from a >> source. >> >> A standard way to achieve this is at the start of the task, select max >> `updated_at` in source table and hold on to that value for a minute. This >> is your tentative new high watermark. >> Now it's time to pull your data. If your task ran before, grab last high >> watermark. If not, use initial load value. >> If successful, update high watermark. >> >> On my team we implemented this with a stateful tasks / stateful processes >> concept (there's a dormant draft AIP here >> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>) >> and a WatermarkOperator that handled the boilerplate*. >> >> Again here, I don't have a specific suggestion at this moment. But I >> wanted to articulate this workflow because it is common and it wasn't >> immediately obvious to me in reading AIP-39 how I would use it to implement >> it. >> >> AIP-39 makes airflow more data-aware. So if it can support this kind of >> workflow that's great. @Ash Berlin-Taylor <[email protected]> do you >> have thoughts on how it might be compatible with this kind of thing as is? >> >> --- >> >> * The base operator is designed so that Subclasses only need to implement >> two methods: >> - `get_high_watermark`: produce the tentative new high watermark >> ' `watermark_execute`: analogous to implementing poke in a sensor, >> this is where your work is done. `execute` is left to the base class, and >> it orchestrates (1) getting last high watermark or inital load value and >> (2) updating new high watermark if job successful. >> >>
