> Ash, you had mentioned something about some plans that were in conflict > with the above hack.... could you maybe share a thought or two about what > you were thinking? >
The main thing here is around scheduler HA, and I want the scheduler to be able to make all scheduling decisions without needing to load any user-provided code. i.e. that A "dag serializer" process can be the only thing in the scheduler that loads/executes user code, and all other scheduling decisions operate on the (de)serialized version.. -ash On May 6 2020, at 7:21 pm, Daniel Standish <dpstand...@gmail.com> wrote: > Inspired by James, I tried this out... > > For others interested, here is sample dag to test it out: > > class MyDAG(DAG): > def following_schedule(self, dttm): > pen_dt = pendulum.instance(dttm).replace(second=0, microsecond=0) > minutes = pen_dt.minute > minutes_mod = minutes % 10 > if minutes_mod < 5: > return pen_dt.add(minutes=1) > else: > return pen_dt.add(minutes=10 - minutes_mod) > > def previous_schedule(self, dttm): > pen_dt = pendulum.instance(dttm).replace(second=0, microsecond=0) > minutes = pen_dt.minute > minutes_mod = minutes % 10 > if minutes_mod < 5: > return pen_dt.add(minutes=-1) > else: > return pen_dt.add(minutes=-(minutes_mod - 5)) > > > dag = MyDAG( > dag_id='test_schd', > default_args=default_args, > schedule_interval='@daily', > catchup=True, > concurrency=1000, > max_active_runs=10, > ) > > > with dag: > DummyOperator(task_id='test', task_concurrency=1000) > > What this will do is trigger one run for every minute when minutes > (mod 10) > is between 0-4 but not schedule anything between 5-9 *(or something like > that, i did not scrutinize the edges carefully)*. > > But... anyway, it'll prove that it works relatively quickly and > that's the > point. > > I have a use case. I think i might use it, rather than adding branching > logic. It's ugly, but they are both ugly. > > *Question* > > What do people think about a Schedule abstraction that takes the > previous_schedule and following_schedule methods from dag (perhaps rename > to previous_execution following_execution?) > > Then I imagine we could do this: > * deprecate the `schedule_interval` param > * rename schedule_interval to ``schedule: Union[Schedule, str, timedelta]`` > and preserve backward compatibility > * if str or timedelta is given, we instantiate a suitable Schedule object. > Perhaps there is a CronSchedule and a TimedeltaSchedule. > > Any interest? > > Ash, you had mentioned something about some plans that were in conflict > with the above hack.... could you maybe share a thought or two about what > you were thinking? > > *Another idea* > > If we could maybe leave it to the `Schedule` class to decide the > relationship between run time and "execution_date". There is the "interval > edge PR"... But maybe there would be an elegant way to control that > behavior in the Schedule class. Perhaps simply a class constant, or param. >