I strongly agree with reducing Airflows complexity and the plan to serialize DAGs by default makes a lot of sense. So wouldn't it make sense to also serialize the schedule of a DAG? e.g. Provide a table of execution dates that the scheduler can keep upcoming ones cached and periodically check for update, and give the user a way to update it if they need to.
Outside customization Airflows current schedule definition solution is fairly weak. For example in the very common use case of following a countries working day calendar it's actually pretty tricky for a new user to implement in Airflow. Say you want to follow a fairly normal east Asian Calendar, the current way recommended way to do this in Airflow is the following: 1. Create a DAG schedule Monday - Friday 2. Write a Holiday Branch / Short Circuit Operator to skip tasks on a Holiday, put inside your own custom Airflow Operators package and distribute with every Airflow instance 3. Write a function to calculate the actual "{{ next_ds }}" as the one Airflow provides is normalized to UTC and therefore is usually 1 day out for DAG aware timezone in Asia, put it in the Macro Plugins 4. Write functions that can calculate the actual "{{ prev_ds }}" and "{{ ds }}" based on skipping Holidays for that calendar and the timezone the DAG is in, put them in the Macro Plugins 5. For every DAG make sure you correctly upstream the Holiday Branch Operator and be very careful to only ever use the custom datetime macros that you've writen And that is the most simple use case, in the real world you also need to deal with weekends that fall on different days, "reverse holiday weekends" where days that are usually a weekend become a working day (a real thing in India), schedules that follow things like "the 2nd Monday after Easter" or "every 15 minutes between 9am and 5am on a working day and every hour outside that time". I've handled all such situations but I've really felt like I've had to fight to get what I need out of Airflow, which in retrospect is odd for a tool that is designed to schedule workflows. Anyway my 2 cents as an Airflow user. Regards Damian -----Original Message----- From: Dan Davydov <ddavy...@twitter.com.INVALID> Sent: Monday, May 11, 2020 11:22 To: dev@airflow.apache.org Subject: Re: Setting to add choice of schedule at end or schedule at start of interval I strongly agree with Ash, I also think we should strive to decrease the complexity of core Airflow components and not offer customization/extensibility especially in the form of plugins where it is not needed to make Airflow more robust and easier to reason about (less testing configuration). I think there is some potential one-time migration pain we need to go through here but would prefer a simpler solution here (something similar to the original github PR that was raised to resolve this issue, having trouble finding it at the moment), with other things like data lineage being addressed in separate features to keep the scope down. On Mon, May 11, 2020 at 6:15 AM Ash Berlin-Taylor <a...@apache.org> wrote: > > Ash, you had mentioned something about some plans that were in > > conflict with the above hack.... could you maybe share a thought or > > two about what you were thinking? > > > > The main thing here is around scheduler HA, and I want the scheduler > to be able to make all scheduling decisions without needing to load > any user-provided code. > > i.e. that A "dag serializer" process can be the only thing in the > scheduler that loads/executes user code, and all other scheduling > decisions operate on the (de)serialized version.. > > -ash > > On May 6 2020, at 7:21 pm, Daniel Standish <dpstand...@gmail.com> wrote: > > > Inspired by James, I tried this out... > > > > For others interested, here is sample dag to test it out: > > > > class MyDAG(DAG): > > def following_schedule(self, dttm): > > pen_dt = pendulum.instance(dttm).replace(second=0, microsecond=0) > > minutes = pen_dt.minute > > minutes_mod = minutes % 10 > > if minutes_mod < 5: > > return pen_dt.add(minutes=1) > > else: > > return pen_dt.add(minutes=10 - minutes_mod) > > > > def previous_schedule(self, dttm): > > pen_dt = pendulum.instance(dttm).replace(second=0, microsecond=0) > > minutes = pen_dt.minute > > minutes_mod = minutes % 10 > > if minutes_mod < 5: > > return pen_dt.add(minutes=-1) > > else: > > return pen_dt.add(minutes=-(minutes_mod - 5)) > > > > > > dag = MyDAG( > > dag_id='test_schd', > > default_args=default_args, > > schedule_interval='@daily', > > catchup=True, > > concurrency=1000, > > max_active_runs=10, > > ) > > > > > > with dag: > > DummyOperator(task_id='test', task_concurrency=1000) > > > > What this will do is trigger one run for every minute when minutes > > (mod 10) is between 0-4 but not schedule anything between 5-9 *(or > > something like that, i did not scrutinize the edges carefully)*. > > > > But... anyway, it'll prove that it works relatively quickly and > > that's the point. > > > > I have a use case. I think i might use it, rather than adding > > branching logic. It's ugly, but they are both ugly. > > > > *Question* > > > > What do people think about a Schedule abstraction that takes the > > previous_schedule and following_schedule methods from dag (perhaps > > rename to previous_execution following_execution?) > > > > Then I imagine we could do this: > > * deprecate the `schedule_interval` param > > * rename schedule_interval to ``schedule: Union[Schedule, str, > timedelta]`` > > and preserve backward compatibility > > * if str or timedelta is given, we instantiate a suitable Schedule > object. > > Perhaps there is a CronSchedule and a TimedeltaSchedule. > > > > Any interest? > > > > Ash, you had mentioned something about some plans that were in > > conflict with the above hack.... could you maybe share a thought or > > two about what you were thinking? > > > > *Another idea* > > > > If we could maybe leave it to the `Schedule` class to decide the > > relationship between run time and "execution_date". There is the > "interval > > edge PR"... But maybe there would be an elegant way to control that > > behavior in the Schedule class. Perhaps simply a class constant, or > param. > > > =============================================================================== Please access the attached hyperlink for an important electronic communications disclaimer: http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html ===============================================================================