My understanding here is that some users just wants to trigger a dag at the time they prefer rather than always at the end of schedule_interval? I saw two use cases in the thread: 1. The usecase from Daniel wants to trigger at start of schedule_interval 2. The usecase from Shaw has some holidays to skip
For both usecases, has anyone considered setting schedule_interval to None and use a TriggerDagRunOperator to trigger the DAG instead? If you add the following entry to the [scheduler] section of airflow.cfg, you can literally trigger the DAG at any time or interval using any logic you like. # Allow externally triggered DagRuns for Execution Dates in the future # Only has effect if schedule_interval is set to None in DAG # We set this to True so that tasks on the DagRun with execution_date T can start running before midnight in UTC on date T. allow_trigger_in_future = True Thanks, Qian On Mon, May 18, 2020 at 1:07 AM Bas Harenslak <basharens...@godatadriven.com.invalid> wrote: > My 2c: adding an option to schedule at the start of an interval is yet > another option to take in, adding more complexity. Therefore I’m not in > favour of it. > > The scheduling at start/end has often been a discussed, IMO it's a fact > which one has to know when learning Airflow. But not something that is > either good or bad, therefore changing it would introduce confusion. > > What is bad IMO is adding more options to change the default behaviour, > and make Airflow even more complex to comprehend. The core functionality > should be as simple as possible. > > Besides the start/end discussion, @Damian, the scheduling you described > there seems like a valid use case. I’m thinking something like a > “scheduling blacklist” would be a nice feature here. Could you create a > GitHub issue for that? > > Bas > > On 11 May 2020, at 18:17, Shaw, Damian P. <damian.sha...@credit-suisse.com > <mailto:damian.sha...@credit-suisse.com>> wrote: > > > I strongly agree with reducing Airflows complexity and the plan to > serialize DAGs by default makes a lot of sense. So wouldn't it make sense > to also serialize the schedule of a DAG? > e.g. Provide a table of execution dates that the scheduler can keep > upcoming ones cached and periodically check for update, and give the user a > way to update it if they need to. > > Outside customization Airflows current schedule definition solution is > fairly weak. > > For example in the very common use case of following a countries working > day calendar it's actually pretty tricky for a new user to implement in > Airflow. Say you want to follow a fairly normal east Asian Calendar, the > current way recommended way to do this in Airflow is the following: > > 1. Create a DAG schedule Monday - Friday > 2. Write a Holiday Branch / Short Circuit Operator to skip tasks on a > Holiday, put inside your own custom Airflow Operators package and > distribute with every Airflow instance > 3. Write a function to calculate the actual "{{ next_ds }}" as the one > Airflow provides is normalized to UTC and therefore is usually 1 day out > for DAG aware timezone in Asia, put it in the Macro Plugins > 4. Write functions that can calculate the actual "{{ prev_ds }}" and "{{ > ds }}" based on skipping Holidays for that calendar and the timezone the > DAG is in, put them in the Macro Plugins > 5. For every DAG make sure you correctly upstream the Holiday Branch > Operator and be very careful to only ever use the custom datetime macros > that you've writen > > And that is the most simple use case, in the real world you also need to > deal with weekends that fall on different days, "reverse holiday weekends" > where days that are usually a weekend become a working day (a real thing in > India), schedules that follow things like "the 2nd Monday after Easter" or > "every 15 minutes between 9am and 5am on a working day and every hour > outside that time". I've handled all such situations but I've really felt > like I've had to fight to get what I need out of Airflow, which in > retrospect is odd for a tool that is designed to schedule workflows. > > Anyway my 2 cents as an Airflow user. > > Regards > Damian > > -----Original Message----- > From: Dan Davydov <ddavy...@twitter.com.INVALID<mailto: > ddavy...@twitter.com.INVALID>> > Sent: Monday, May 11, 2020 11:22 > To: dev@airflow.apache.org<mailto:dev@airflow.apache.org> > Subject: Re: Setting to add choice of schedule at end or schedule at start > of interval > > I strongly agree with Ash, I also think we should strive to decrease the > complexity of core Airflow components and not offer > customization/extensibility especially in the form of plugins where it is > not needed to make Airflow more robust and easier to reason about (less > testing configuration). I think there is some potential one-time migration > pain we need to go through here but would prefer a simpler solution here > (something similar to the original github PR that was raised to resolve > this issue, having trouble finding it at the moment), with other things > like data lineage being addressed in separate features to keep the scope > down. > > On Mon, May 11, 2020 at 6:15 AM Ash Berlin-Taylor <a...@apache.org<mailto: > a...@apache.org>> wrote: > > Ash, you had mentioned something about some plans that were in > conflict with the above hack.... could you maybe share a thought or > two about what you were thinking? > > > The main thing here is around scheduler HA, and I want the scheduler > to be able to make all scheduling decisions without needing to load > any user-provided code. > > i.e. that A "dag serializer" process can be the only thing in the > scheduler that loads/executes user code, and all other scheduling > decisions operate on the (de)serialized version.. > > -ash > > On May 6 2020, at 7:21 pm, Daniel Standish <dpstand...@gmail.com<mailto: > dpstand...@gmail.com>> wrote: > > Inspired by James, I tried this out... > > For others interested, here is sample dag to test it out: > > class MyDAG(DAG): > def following_schedule(self, dttm): > pen_dt = pendulum.instance(dttm).replace(second=0, microsecond=0) > minutes = pen_dt.minute > minutes_mod = minutes % 10 > if minutes_mod < 5: > return pen_dt.add(minutes=1) > else: > return pen_dt.add(minutes=10 - minutes_mod) > > def previous_schedule(self, dttm): > pen_dt = pendulum.instance(dttm).replace(second=0, microsecond=0) > minutes = pen_dt.minute > minutes_mod = minutes % 10 > if minutes_mod < 5: > return pen_dt.add(minutes=-1) > else: > return pen_dt.add(minutes=-(minutes_mod - 5)) > > > dag = MyDAG( > dag_id='test_schd', > default_args=default_args, > schedule_interval='@daily', > catchup=True, > concurrency=1000, > max_active_runs=10, > ) > > > with dag: > DummyOperator(task_id='test', task_concurrency=1000) > > What this will do is trigger one run for every minute when minutes > (mod 10) is between 0-4 but not schedule anything between 5-9 *(or > something like that, i did not scrutinize the edges carefully)*. > > But... anyway, it'll prove that it works relatively quickly and > that's the point. > > I have a use case. I think i might use it, rather than adding > branching logic. It's ugly, but they are both ugly. > > *Question* > > What do people think about a Schedule abstraction that takes the > previous_schedule and following_schedule methods from dag (perhaps > rename to previous_execution following_execution?) > > Then I imagine we could do this: > * deprecate the `schedule_interval` param > * rename schedule_interval to ``schedule: Union[Schedule, str, > timedelta]`` > and preserve backward compatibility > * if str or timedelta is given, we instantiate a suitable Schedule > object. > Perhaps there is a CronSchedule and a TimedeltaSchedule. > > Any interest? > > Ash, you had mentioned something about some plans that were in > conflict with the above hack.... could you maybe share a thought or > two about what you were thinking? > > *Another idea* > > If we could maybe leave it to the `Schedule` class to decide the > relationship between run time and "execution_date". There is the > "interval > edge PR"... But maybe there would be an elegant way to control that > behavior in the Schedule class. Perhaps simply a class constant, or > param. > > > > > > > =============================================================================== > Please access the attached hyperlink for an important electronic > communications disclaimer: > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html > > =============================================================================== > >