I strongly agree with reducing Airflows complexity and the plan to serialize 
DAGs by default makes a lot of sense. So wouldn't it make sense to also 
serialize the schedule of a DAG?
e.g. Provide a table of execution dates that the scheduler can keep upcoming 
ones cached and periodically check for update, and give the user a way to 
update it if they need to.

Outside customization Airflows current schedule definition solution is fairly 
weak.

For example in the very common use case of following a countries working day 
calendar it's actually pretty tricky for a new user to implement in Airflow. 
Say you want to follow a fairly normal east Asian Calendar, the current way 
recommended way to do this in Airflow is the following:

1. Create a DAG schedule Monday - Friday
2. Write a Holiday Branch / Short Circuit Operator to skip tasks on a Holiday, 
put inside your own custom Airflow Operators package and distribute with every 
Airflow instance
3. Write a function to calculate the actual "{{ next_ds }}" as the one Airflow 
provides is normalized to UTC and therefore is usually 1 day out for DAG aware 
timezone in Asia, put it in the Macro Plugins
4. Write functions that can calculate the actual "{{ prev_ds }}" and "{{ ds }}" 
based on skipping Holidays for that calendar and the timezone the DAG is in, 
put them in the Macro Plugins 
5. For every DAG make sure you correctly upstream the Holiday Branch Operator 
and be very careful to only ever use the custom datetime macros that you've 
writen

And that is the most simple use case, in the real world you also need to deal 
with weekends that fall on different days, "reverse holiday weekends" where 
days that are usually a weekend become a working day (a real thing in India), 
schedules that follow things like "the 2nd Monday after Easter" or "every 15 
minutes between 9am and 5am on a working day and every hour outside that time". 
 I've handled all such situations but I've really felt like I've had to fight 
to get what I need out of Airflow, which in retrospect is odd for a tool that 
is designed to schedule workflows.

Anyway my 2 cents as an Airflow user.

Regards
Damian

-----Original Message-----
From: Dan Davydov <ddavy...@twitter.com.INVALID> 
Sent: Monday, May 11, 2020 11:22
To: dev@airflow.apache.org
Subject: Re: Setting to add choice of schedule at end or schedule at start of 
interval

I strongly agree with Ash, I also think we should strive to decrease the 
complexity of core Airflow components and not offer customization/extensibility 
especially in the form of plugins where it is not needed to make Airflow more 
robust and easier to reason about (less testing configuration). I think there 
is some potential one-time migration pain we need to go through here but would 
prefer a simpler solution here (something similar to the original github PR 
that was raised to resolve this issue, having trouble finding it at the 
moment), with other things like data lineage being addressed in separate 
features to keep the scope down.

On Mon, May 11, 2020 at 6:15 AM Ash Berlin-Taylor <a...@apache.org> wrote:

> > Ash, you had mentioned something about some plans that were in 
> > conflict with the above hack.... could you maybe share a thought or 
> > two about what you were thinking?
> >
>
> The main thing here is around scheduler HA, and I want the scheduler 
> to be able to make all scheduling decisions without needing to load 
> any user-provided code.
>
> i.e. that A "dag serializer" process can be the only thing in the 
> scheduler that loads/executes user code, and all other scheduling 
> decisions operate on the (de)serialized version..
>
> -ash
>
> On May 6 2020, at 7:21 pm, Daniel Standish <dpstand...@gmail.com> wrote:
>
> > Inspired by James, I tried this out...
> >
> > For others interested, here is sample dag to test it out:
> >
> > class MyDAG(DAG):
> >    def following_schedule(self, dttm):
> >        pen_dt = pendulum.instance(dttm).replace(second=0, microsecond=0)
> >        minutes = pen_dt.minute
> >        minutes_mod = minutes % 10
> >        if minutes_mod < 5:
> >            return pen_dt.add(minutes=1)
> >        else:
> >            return pen_dt.add(minutes=10 - minutes_mod)
> >
> >    def previous_schedule(self, dttm):
> >        pen_dt = pendulum.instance(dttm).replace(second=0, microsecond=0)
> >        minutes = pen_dt.minute
> >        minutes_mod = minutes % 10
> >        if minutes_mod < 5:
> >            return pen_dt.add(minutes=-1)
> >        else:
> >            return pen_dt.add(minutes=-(minutes_mod - 5))
> >
> >
> > dag = MyDAG(
> >    dag_id='test_schd',
> >    default_args=default_args,
> >    schedule_interval='@daily',
> >    catchup=True,
> >    concurrency=1000,
> >    max_active_runs=10,
> > )
> >
> >
> > with dag:
> >    DummyOperator(task_id='test', task_concurrency=1000)
> >
> > What this will do is trigger one run for every minute when minutes 
> > (mod 10) is between 0-4 but not schedule anything between 5-9 *(or 
> > something like that, i did not scrutinize the edges carefully)*.
> >
> > But...  anyway, it'll prove that it works relatively quickly and 
> > that's the point.
> >
> > I have a use case.  I think i might use it, rather than adding 
> > branching logic.  It's ugly, but they are both ugly.
> >
> > *Question*
> >
> > What do people think about a Schedule abstraction that takes the 
> > previous_schedule and following_schedule methods from dag (perhaps 
> > rename to previous_execution following_execution?)
> >
> > Then I imagine we could do this:
> > * deprecate the `schedule_interval` param
> > * rename schedule_interval to ``schedule: Union[Schedule, str,
> timedelta]``
> > and preserve backward compatibility
> > * if str or timedelta is given, we instantiate a suitable Schedule
> object.
> > Perhaps there is a CronSchedule and a TimedeltaSchedule.
> >
> > Any interest?
> >
> > Ash, you had mentioned something about some plans that were in 
> > conflict with the above hack.... could you maybe share a thought or 
> > two about what you were thinking?
> >
> > *Another idea*
> >
> > If we could maybe leave it to the `Schedule` class to decide the 
> > relationship between run time and "execution_date".  There is the
> "interval
> > edge PR"...  But maybe there would be an elegant way to control that 
> > behavior in the Schedule class.  Perhaps simply a class constant, or
> param.
> >
>



=============================================================================== 
Please access the attached hyperlink for an important electronic communications 
disclaimer: 
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
=============================================================================== 

Reply via email to