I strongly agree with Ash, I also think we should strive to decrease the
complexity of core Airflow components and not offer
customization/extensibility especially in the form of plugins where it is
not needed to make Airflow more robust and easier to reason about (less
testing configuration). I think there is some potential one-time migration
pain we need to go through here but would prefer a simpler solution here
(something similar to the original github PR that was raised to resolve
this issue, having trouble finding it at the moment), with other things
like data lineage being addressed in separate features to keep the scope
down.

On Mon, May 11, 2020 at 6:15 AM Ash Berlin-Taylor <a...@apache.org> wrote:

> > Ash, you had mentioned something about some plans that were in conflict
> > with the above hack.... could you maybe share a thought or two about what
> > you were thinking?
> >
>
> The main thing here is around scheduler HA, and I want the scheduler to
> be able to make all scheduling decisions without needing to load any
> user-provided code.
>
> i.e. that A "dag serializer" process can be the only thing in the
> scheduler that loads/executes user code, and all other scheduling
> decisions operate on the (de)serialized version..
>
> -ash
>
> On May 6 2020, at 7:21 pm, Daniel Standish <dpstand...@gmail.com> wrote:
>
> > Inspired by James, I tried this out...
> >
> > For others interested, here is sample dag to test it out:
> >
> > class MyDAG(DAG):
> >    def following_schedule(self, dttm):
> >        pen_dt = pendulum.instance(dttm).replace(second=0, microsecond=0)
> >        minutes = pen_dt.minute
> >        minutes_mod = minutes % 10
> >        if minutes_mod < 5:
> >            return pen_dt.add(minutes=1)
> >        else:
> >            return pen_dt.add(minutes=10 - minutes_mod)
> >
> >    def previous_schedule(self, dttm):
> >        pen_dt = pendulum.instance(dttm).replace(second=0, microsecond=0)
> >        minutes = pen_dt.minute
> >        minutes_mod = minutes % 10
> >        if minutes_mod < 5:
> >            return pen_dt.add(minutes=-1)
> >        else:
> >            return pen_dt.add(minutes=-(minutes_mod - 5))
> >
> >
> > dag = MyDAG(
> >    dag_id='test_schd',
> >    default_args=default_args,
> >    schedule_interval='@daily',
> >    catchup=True,
> >    concurrency=1000,
> >    max_active_runs=10,
> > )
> >
> >
> > with dag:
> >    DummyOperator(task_id='test', task_concurrency=1000)
> >
> > What this will do is trigger one run for every minute when minutes
> > (mod 10)
> > is between 0-4 but not schedule anything between 5-9 *(or something like
> > that, i did not scrutinize the edges carefully)*.
> >
> > But...  anyway, it'll prove that it works relatively quickly and
> > that's the
> > point.
> >
> > I have a use case.  I think i might use it, rather than adding branching
> > logic.  It's ugly, but they are both ugly.
> >
> > *Question*
> >
> > What do people think about a Schedule abstraction that takes the
> > previous_schedule and following_schedule methods from dag (perhaps rename
> > to previous_execution following_execution?)
> >
> > Then I imagine we could do this:
> > * deprecate the `schedule_interval` param
> > * rename schedule_interval to ``schedule: Union[Schedule, str,
> timedelta]``
> > and preserve backward compatibility
> > * if str or timedelta is given, we instantiate a suitable Schedule
> object.
> > Perhaps there is a CronSchedule and a TimedeltaSchedule.
> >
> > Any interest?
> >
> > Ash, you had mentioned something about some plans that were in conflict
> > with the above hack.... could you maybe share a thought or two about what
> > you were thinking?
> >
> > *Another idea*
> >
> > If we could maybe leave it to the `Schedule` class to decide the
> > relationship between run time and "execution_date".  There is the
> "interval
> > edge PR"...  But maybe there would be an elegant way to control that
> > behavior in the Schedule class.  Perhaps simply a class constant, or
> param.
> >
>

Reply via email to