I took a stab at implementing this timetable: (I didn't write most of the timetable code, that was TP Chung, but I did review most/all of the PRs so I clearly know this code)

class WeekdayTimetable(Timetable):
def next_dagrun_info(self, *, last_automated_data_interval: Optional[DataInterval], restriction: TimeRestriction) -> Optional[DagRunInfo]:

       earliest: pendulum.DateTime | None= None
       if last_automated_data_interval:
earliest = pendulum.instance(last_automated_data_interval.end)
       elif restriction:
           if not restriction.catchup:
               # Since we are daily, start of today is the earliest
               earliest = pendulum.today()
           elif restriction.earliest:
               earliest = pendulum.instance(restriction.earliest)

       if not earliest:
           return None

       interval = self._next_weekday_interval(earliest)
return DagRunInfo(run_after=interval.end, data_interval=interval)

def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
       return self._next_weekday_interval(run_after)

   def _next_weekday_interval(self, day = pendulum.DateTime):
       if pendulum.MONDAY < day.day_of_week > pendulum.FRIDAY:
           day = day.next(pendulum.MONDAY)

       end = day.start_of('day').add(days=1)
       return DataInterval(day, end)

The output of testing it out:

Friday 21 00:00 Saturday 00:00
Monday 24 00:00 Tuesday 00:00
Tuesday 25 00:00 Wednesday 00:00
Wednesday 26 00:00 Thursday 00:00
Thursday 27 00:00 Friday 00:00
Friday 28 00:00 Saturday 00:00
Monday 31 00:00 Tuesday 00:00
Manual Wednesday 23 13:57 Thursday 00:00
No catchup Wednesday 23 February 00:00 Thursday 00:00


And my test code:

dag2 = DAG(dag_id='timetable-test', timetable=WeekdayTimetable(), start_date=pendulum.DateTime(2022, 1, 21))

info = None
dag2.catchup = True
for _ in range(7):
   info = dag2.next_dagrun_info(info.data_interval if info else None)
   if info:
print(info.data_interval.start.format('dddd DD HH:mm'), info.data_interval.end.format('dddd HH:mm'))
   else:
       break

data_interval = dag2.timetable.infer_manual_data_interval(run_after=pendulum.now()) print("Manual", data_interval.start.format('dddd DD HH:mm'), data_interval.end.format('dddd HH:mm'))


dag3 = DAG(dag_id='timetable-test', timetable=WeekdayTimetable(), start_date=pendulum.DateTime(2022, 1, 21), catchup=False)
info = dag3.next_dagrun_info(None)
if info:
   print(
       "No catchup",
       info.data_interval.start.format('dddd DD MMMM HH:mm'),
       info.data_interval.end.format('dddd HH:mm'),
   )
else:
   print("none")


On Wed, Feb 23 2022 at 13:39:24 +0000, Ash Berlin-Taylor <[email protected]> wrote:
One comment: Please don't use the phrase "execution time" as it's not clear which of the possible meanings it could be (is it the old exectuion_date? Is it the time the dagrun actually starts?)

Backfilling is not out of scope for a timetable at all. If I run `airflow dags backfill mydagid --start-date 2020-01-01 --end-date 2021-06-30` how many DagRuns are created and what are logical dates/intervals of them?

The scheduler provides the most recent execution time as input and
creates a dagrun if the returned earliest runtime for the next
execution time is at or after the current time.

And in case of the very first time a Dag is enabled? I guess it could pass the dag start_date here instead?

(Implementation detail: It stores the info in 4 columns in the DagModel table, next_dagrun, next_dagrun_interval_start and _end, and next_dagrun_create_after so that the creation of the DagRun can just be done as a DB lookup. Doesn't materially change your statement)

- Manual runs are trivial because there is no (2) or (3). In fact, for
most DAGs (which care about a data interval), there should probably
not be a play button at all.

If manual triggered dag runs are out of scope, what is the data_interval_end and data_interval_start values (in the context/templates) for a manually triggered run?

It's not possible from the UI currently, but `airflow dags trigger` can be provided with a specific execution date -- Maybe this should be extended to take the data interval too -- but in terms of User-friendly CLI inferring it if not provided makes it easier to use. (A timetable could choose to return an error for the infer method)

Next question about how go about implementing and releasing this? Now that it's been in a release we can't just break backcompat, so either we need to make this a "Base" template that handles most of this logic, or we need to introspect and tell old Timetable from new.

You are probably right about earliest/latest and we might be able to do away with that part of the interface.

-ash

On Wed, Feb 23 2022 at 12:44:10 +0000, Malthe <[email protected]> wrote:
Hi all,

I was going to take a stab at adding some custom timetable
functionality to address two requirements:

1. The ability to temporarily switch to an alternative timetable for
an interim period.
2. The ability to introduce relatively custom holiday scheduling which
is well outside the functionality of cron expressions.

I could add that while (1) could be done using Python at the
DAG-level, I would like to use the timetable interface to allow
accurate predictions into the future. That's for another post, but to
give some context, I have floated a proposal on Slack to show
tentative scheduled days in the calendar view using a "grey dot"
indication to denote that we expect at least on scheduled run (see
attachment).

Now, I looked at the "afterwork" example to see what's required in
order to implement a custom timetable. And I must admit that I find it rather daunting given that it's so easy to express what that timetable
is about:

     MON-FRI, daily, run after midnight

Intuitively, that should be a couple of lines of Python code. It ends
up being quite a lot more than that and that's due to the interface
that must be implemented.

I think the correct timetable interface is:

1. Return the next execution time that's strictly (">") after a particular time.
2. Return the earliest runtime for a particular execution time,
accounting for any grace period.
3. Return context metadata for this execution.

The scheduler provides the most recent execution time as input and
creates a dagrun if the returned earliest runtime for the next
execution time is at or after the current time.

Considering again the "afterwork" example, with a grace period of 5
minutes, we'd expect a dagrun shortly after 5 minutes past midnight
(of Monday, Tuesday, and so forth up until midnight after Friday). The
execution time _is_ the time where a given task runs (minus the grace
period).

The reasoning behind (3) is because I consider the notion of "data
interval" to be metadata since this is only a concern for the task
implementation. For example, the scheduler does not need to worry
about this at all.

Other concerns:

- Backfilling is out of scope for the timetable interface.
- Time restrictions (i.e. start and end date) are likewise out of
scope. The scheduler knows when the DAG starts and ends and doesn't
need help from the timetable here.
- Manual runs are trivial because there is no (2) or (3). In fact, for
most DAGs (which care about a data interval), there should probably
not be a play button at all.

I didn't complete the exercise, but it stands to reason that with this interface, the "afterwork" example would be short and simple given the
interface outlined above.

Thanks

Reply via email to