I just wanted to share this with everyone related to this topic. I have a
case where I need a scheduled dag run with following schedule:
Sun: 10 PM
Monday-Thursday: 8 AM, 11AM, 3:40 PM, 4:00 PM, 7:30 PM, 10PM
Friday: 8 AM, 11AM, 3:40 PM, 4:00 PM, 7:30 PM
There are most certainly a few ways to address this with multiple dags,
time sensors, or trigger dag run operators, but I wanted to do it in a
single dag, no sensors. So, as Max suggested earlier, I overwrote the
following/previous methods of dag. It's been running for a few days seems
to be working.


# this has a unique schedule
# Sunday - 10 PM
# M-Th - 8 AM, 11 AM, 3:40 PM, 4 PM, 7:30 PM, 10 PM
# Fr - 8 AM, 11 AM, 3:40 PM, 3:50 PM, 7:30 PM

def intraday_scheduler(dttm, schedule_type, _dag):
    tz = pendulum.timezone("US/Eastern")
    run_times = [
        time(8, 0, tzinfo=tz),
        time(11, 0, tzinfo=tz),
        time(15, 40, tzinfo=tz),
        time(16, 0, tzinfo=tz),
        time(19, 30, tzinfo=tz),
        time(22, 0, tzinfo=tz),
    ]
    day_times = {
        SUN: run_times[-1:],
        MON: run_times,
        TUE: run_times,
        WED: run_times,
        THU: run_times,
        FRI: run_times[:-1],
    }
    day_times_lookup = day_times.keys() * 2
    cur_dow = dttm.weekday()
    cur_dow_loc = day_times_lookup.index(cur_dow)
    next_day = day_times_lookup[cur_dow_loc + 1]
    prev_day = day_times_lookup[cur_dow_loc - 1]
    def get_times(dow):
        return [
            dttm.replace(
                hour=x.hour, minute=x.minute, second=0, microsecond=0,
tzinfo=x.tzinfo
            ).astimezone(pendulum.timezone("UTC"))
            + timedelta(days=dow - cur_dow)
            for x in day_times.get(dow, [])
        ]
    all_run_times = sorted(
        get_times(prev_day) + get_times(cur_dow) + get_times(next_day)
    )
    upcoming_time = sorted([dttm_ for dttm_ in all_run_times if dttm_ >
dttm])[0]
    if schedule_type == "following":
        # the assumption here is that if the dttm passed in is greater than
latest execution date
        # or the start date, then we are looking for the  "right edge" of
the interval period. In order
        # to make is run right away, we will make the scheduler believe the
period end is
        # dttm + 1 microsecond. If that is not the case, just return the
next scheduled time.
        if dttm > (_dag.latest_execution_date or start_date):
            return dttm.replace(microsecond=1)
        else:
            return upcoming_time
    if schedule_type == "previous":
        return all_run_times[all_run_times.index(upcoming_time) - 1]

On Thu, Sep 26, 2019 at 6:54 AM James Coder <jcode...@gmail.com> wrote:

> I think that will lead to a very large number of questions about why it
> worked before and now it doesn’t when doing a clean install.
> And additionally, if developing in a new install and deploying to an old
> install, you would get different behavior. Adding to more confusion.
>
> James Coder
>
> > On Sep 26, 2019, at 6:41 AM, Ash Berlin-Taylor <a...@apache.org> wrote:
> >
> > I'm wondering if there is some way we can do this so that new installs
> will pick up the new default, but anyone that carries over an Airflow.cfg
> from an old install will keep their existing behaviour.
> >
> > And then also is that a good/helpful idea or will that be more confusing
> than not?
> >
> > -a
> >
> >> On 26 Sep 2019, at 11:40, Kaxil Naik <kaxiln...@gmail.com> wrote:
> >>
> >> I definitely agree. If we don't update it in 2.0 it is going to be hard
> to
> >> change that in any 2.x versions
> >>
> >> On Thu, Sep 26, 2019 at 10:51 AM James Meickle
> >> <jmeic...@quantopian.com.invalid> wrote:
> >>
> >>> I am *strongly* in favor of using the 2.0 update to break compat here,
> >>> because this is a very confusing feature to most new users of Airflow,
> but
> >>> also will break a _lot_ of DAGs. I feel like if we don't change this
> in 2.0
> >>> we probably won't for any 2.x either, which would be a shame.
> >>>
> >>>> On Wed, Sep 25, 2019 at 8:33 PM Kaxil Naik <kaxiln...@gmail.com>
> wrote:
> >>>>
> >>>> I agree with Dan to change the default execution at start of the
> >>> interval.
> >>>>
> >>>> How about adding this for 2.0 ??
> >>>>
> >>>> Don't want to keep delaying this if we have a consensus already.
> >>>>
> >>>> Regards,
> >>>> Kaxil
> >>>>
> >>>>
> >>>> On Fri, Aug 23, 2019, 15:39 Dan Davydov <ddavy...@twitter.com.invalid
> >
> >>>> wrote:
> >>>>
> >>>>> What are people's feelings on changing the default execution to
> >>> schedule
> >>>>> interval start and communicating this to existing users in the
> Updating
> >>>>> notes so that they can preserve the old behavior? Could potentially
> >>> cause
> >>>>> headaches for users who don't read the notes but I think it might
> make
> >>>>> sense to bite the bullet at some point for more intuitive behavior
> >>>> overall
> >>>>> for new users.
> >>>>>
> >>>>> On Fri, Aug 23, 2019 at 10:29 AM Dan Davydov <ddavy...@twitter.com>
> >>>> wrote:
> >>>>>
> >>>>>> I am for this change, since I feel like in general the start of the
> >>>>>> interval is more intuitive (I have been working on Airflow for 3
> >>> years
> >>>>> and
> >>>>>> this still trips me up). That being said I'm not sure how I feel
> >>> about
> >>>>>> allowing customization at DAG level instead of cluster level as it
> >>>> makes
> >>>>> it
> >>>>>> harder to make assumptions about DAGs on the cluster for ops, though
> >>>>> maybe
> >>>>>> this isn't a huge deal given there are tools available that show you
> >>>> why
> >>>>>> tasks aren't running.
> >>>>>>
> >>>>>> I agree with Bole that we should communicate recommended migration
> >>>>>> strategies if they can't be done automatically.
> >>>>>>
> >>>>>> I don't think I'm a fan for arbitrary customization of the interval
> >>>> via a
> >>>>>> callback, my feeling is this would not provide significant value and
> >>>>> could
> >>>>>> be an ops nightmare.
> >>>>>>
> >>>>>> On Fri, Aug 23, 2019 at 9:11 AM Jarek Potiuk <
> >>> jarek.pot...@polidea.com
> >>>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> DST: I recall problems with DST especially when the hour goes back
> >>> and
> >>>>> the
> >>>>>>> daily schedule time technically occurs twice the same day or does
> >>> not
> >>>>>>> occur
> >>>>>>> at all. We have some code that chooses arbitrary the first
> occurence
> >>>> in
> >>>>>>> the
> >>>>>>> latter case (there was a problem that it worked differently python
> >>> 3.6
> >>>>> vs
> >>>>>>> 3.5 (!). But also the case when we move forward is an interesting
> >>>> one. I
> >>>>>>> am
> >>>>>>> not 100% it will work correctly after changing the scheduling
> >>>> mechanisms
> >>>>>>> but it's rather easy to test and there is no harm adding it.
> >>>>>>> There is a DST-specific logic implemented in our next/previous run
> >>>>>>> calculation and I imagine it could get wrong.
> >>>>>>>
> >>>>>>> The tests I am talking about:
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>
> DagTest.test_following_previous_schedule_daily_dag_CEST_to_CET/DagTest.test_following_previous_schedule_daily_dag_CET_to_CEST.
> >>>>>>>
> >>>>>>> Re: arbitrary customisation/converting DAGs. I think there is no
> >>> need
> >>>> to
> >>>>>>> convert existing dags - the default behaviour remains as it is as
> >>> far
> >>>>> as I
> >>>>>>> understand. And this flag is much simpler to understand and reason
> >>>> about
> >>>>>>> than arbitrary function and it corresponds to real business cases:
> >>>>>>>
> >>>>>>> 1) schedule_at_interval_end = True -> wait for the data to be ready
> >>>> for
> >>>>>>> the
> >>>>>>> interval (current/default behaviour related to processing batches
> of
> >>>>> data)
> >>>>>>> 2) schedule_at_interval_end = False -> CRON-like behaviour where we
> >>>>> simply
> >>>>>>> run arbitrary operation in regular intervals (more intuitive for
> >>>> people
> >>>>>>> who
> >>>>>>> are used to CRON-like jobs)
> >>>>>>>
> >>>>>>> You can always build your schedule differently if you need
> something
> >>>>>>> "in-between" IMHO.
> >>>>>>>
> >>>>>>> J.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, Aug 23, 2019 at 8:44 AM James Meickle
> >>>>>>> <jmeic...@quantopian.com.invalid> wrote:
> >>>>>>>
> >>>>>>>> This is a change to one of Airflow's core concepts, and it would
> >>>>>>> require a
> >>>>>>>> lot of work for existing DAGs to cut over to it. Given that, my
> >>>>> personal
> >>>>>>>> preference would be to allow arbitrary customization rather than
> >>>> just
> >>>>> a
> >>>>>>> bit
> >>>>>>>> toggle. Such as allowing passing in a mapping function: given an
> >>>>>>> interval's
> >>>>>>>> start date and end date, when should it be executed?
> >>>>>>>>
> >>>>>>>> On Fri, Aug 23, 2019 at 8:24 AM Jarek Potiuk <
> >>>>> jarek.pot...@polidea.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Happy for it as well. There are a number of cases where
> >>> scheduling
> >>>>> at
> >>>>>>>> start
> >>>>>>>>> makes more sense and as we see Airflow is used now in multiple
> >>>> cases
> >>>>>>>> where
> >>>>>>>>> there is no need to process data from an interval and wait until
> >>>>> that
> >>>>>>>> data
> >>>>>>>>> is ready.
> >>>>>>>>> But indeed some more tests would be great - especially for edge
> >>>>> cases.
> >>>>>>>>> Changig mid-air is one but I think there should be test about
> >>>>> Daylight
> >>>>>>>>> Saving Time changing.
> >>>>>>>>> There are some tests for DST so they just need to be extended to
> >>>>> cover
> >>>>>>>>> those two different cases.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> J.
> >>>>>>>>>
> >>>>>>>>> On Fri, Aug 23, 2019 at 7:37 AM Kaxil Naik <kaxiln...@gmail.com
> >>>>
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Happy for this feature to merged
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Aug 23, 2019, 11:49 Ash Berlin-Taylor <a...@apache.org
> >>>>
> >>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> This has come up a few times before, someone has now opened
> >>> a
> >>>> PR
> >>>>>>> that
> >>>>>>>>>>> makes this a global+per-dag setting:
> >>>>>>>>>>> https://github.com/apache/airflow/pull/5787 and it also
> >>>>> includes
> >>>>>>>> docs
> >>>>>>>>>>> that I think does a good job of illustrating the two modes.
> >>>>>>>>>>>
> >>>>>>>>>>> Does anyone object to this being merged? If no one says
> >>>> anything
> >>>>>>> by
> >>>>>>>>>> midday
> >>>>>>>>>>> on Tuesday I will take that as assent and will merge it.
> >>>>>>>>>>>
> >>>>>>>>>>> The docs from the PR included below.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Ash
> >>>>>>>>>>>
> >>>>>>>>>>> Scheduled Time vs Execution Time
> >>>>>>>>>>> ''''''''''''''''''''''''''''''''
> >>>>>>>>>>>
> >>>>>>>>>>> A DAG with a ``schedule_interval`` will execute once per
> >>>>>>> interval. By
> >>>>>>>>>>> default, the execution of a DAG will occur at the **end** of
> >>>> the
> >>>>>>>>>>> schedule interval.
> >>>>>>>>>>>
> >>>>>>>>>>> A few examples:
> >>>>>>>>>>>
> >>>>>>>>>>> - A DAG with ``schedule_interval='@hourly'``: The DAG run
> >>> that
> >>>>>>>>> processes
> >>>>>>>>>>> 2019-08-16 17:00 will start running just after 2019-08-16
> >>>>>>> 17:59:59,
> >>>>>>>>>>> i.e. once that hour is over.
> >>>>>>>>>>> - A DAG with ``schedule_interval='@daily'``: The DAG run
> >>> that
> >>>>>>>> processes
> >>>>>>>>>>> 2019-08-16 will start running shortly after 2019-08-17
> >>> 00:00.
> >>>>>>>>>>>
> >>>>>>>>>>> The reasoning behind this execution vs scheduling behaviour
> >>> is
> >>>>>>> that
> >>>>>>>>>>> data for the interval to be processed won't be fully
> >>> available
> >>>>>>> until
> >>>>>>>>>>> the interval has elapsed.
> >>>>>>>>>>>
> >>>>>>>>>>> In cases where you wish the DAG to be executed at the
> >>>> **start**
> >>>>> of
> >>>>>>>> the
> >>>>>>>>>>> interval, specify ``schedule_at_interval_end=False``, either
> >>>> in
> >>>>>>>>>>> ``airflow.cfg``, or on a per-DAG basis.
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>>
> >>>>>>>>> Jarek Potiuk
> >>>>>>>>> Polidea <https://www.polidea.com/> | Principal Software
> >>> Engineer
> >>>>>>>>>
> >>>>>>>>> M: +48 660 796 129 <+48660796129>
> >>>>>>>>> [image: Polidea] <https://www.polidea.com/>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>>
> >>>>>>> Jarek Potiuk
> >>>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
> >>>>>>>
> >>>>>>> M: +48 660 796 129 <+48660796129>
> >>>>>>> [image: Polidea] <https://www.polidea.com/>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >
>

Reply via email to