I just wanted to share this with everyone related to this topic. I have a case where I need a scheduled dag run with following schedule: Sun: 10 PM Monday-Thursday: 8 AM, 11AM, 3:40 PM, 4:00 PM, 7:30 PM, 10PM Friday: 8 AM, 11AM, 3:40 PM, 4:00 PM, 7:30 PM There are most certainly a few ways to address this with multiple dags, time sensors, or trigger dag run operators, but I wanted to do it in a single dag, no sensors. So, as Max suggested earlier, I overwrote the following/previous methods of dag. It's been running for a few days seems to be working.
# this has a unique schedule # Sunday - 10 PM # M-Th - 8 AM, 11 AM, 3:40 PM, 4 PM, 7:30 PM, 10 PM # Fr - 8 AM, 11 AM, 3:40 PM, 3:50 PM, 7:30 PM def intraday_scheduler(dttm, schedule_type, _dag): tz = pendulum.timezone("US/Eastern") run_times = [ time(8, 0, tzinfo=tz), time(11, 0, tzinfo=tz), time(15, 40, tzinfo=tz), time(16, 0, tzinfo=tz), time(19, 30, tzinfo=tz), time(22, 0, tzinfo=tz), ] day_times = { SUN: run_times[-1:], MON: run_times, TUE: run_times, WED: run_times, THU: run_times, FRI: run_times[:-1], } day_times_lookup = day_times.keys() * 2 cur_dow = dttm.weekday() cur_dow_loc = day_times_lookup.index(cur_dow) next_day = day_times_lookup[cur_dow_loc + 1] prev_day = day_times_lookup[cur_dow_loc - 1] def get_times(dow): return [ dttm.replace( hour=x.hour, minute=x.minute, second=0, microsecond=0, tzinfo=x.tzinfo ).astimezone(pendulum.timezone("UTC")) + timedelta(days=dow - cur_dow) for x in day_times.get(dow, []) ] all_run_times = sorted( get_times(prev_day) + get_times(cur_dow) + get_times(next_day) ) upcoming_time = sorted([dttm_ for dttm_ in all_run_times if dttm_ > dttm])[0] if schedule_type == "following": # the assumption here is that if the dttm passed in is greater than latest execution date # or the start date, then we are looking for the "right edge" of the interval period. In order # to make is run right away, we will make the scheduler believe the period end is # dttm + 1 microsecond. If that is not the case, just return the next scheduled time. if dttm > (_dag.latest_execution_date or start_date): return dttm.replace(microsecond=1) else: return upcoming_time if schedule_type == "previous": return all_run_times[all_run_times.index(upcoming_time) - 1] On Thu, Sep 26, 2019 at 6:54 AM James Coder <jcode...@gmail.com> wrote: > I think that will lead to a very large number of questions about why it > worked before and now it doesn’t when doing a clean install. > And additionally, if developing in a new install and deploying to an old > install, you would get different behavior. Adding to more confusion. > > James Coder > > > On Sep 26, 2019, at 6:41 AM, Ash Berlin-Taylor <a...@apache.org> wrote: > > > > I'm wondering if there is some way we can do this so that new installs > will pick up the new default, but anyone that carries over an Airflow.cfg > from an old install will keep their existing behaviour. > > > > And then also is that a good/helpful idea or will that be more confusing > than not? > > > > -a > > > >> On 26 Sep 2019, at 11:40, Kaxil Naik <kaxiln...@gmail.com> wrote: > >> > >> I definitely agree. If we don't update it in 2.0 it is going to be hard > to > >> change that in any 2.x versions > >> > >> On Thu, Sep 26, 2019 at 10:51 AM James Meickle > >> <jmeic...@quantopian.com.invalid> wrote: > >> > >>> I am *strongly* in favor of using the 2.0 update to break compat here, > >>> because this is a very confusing feature to most new users of Airflow, > but > >>> also will break a _lot_ of DAGs. I feel like if we don't change this > in 2.0 > >>> we probably won't for any 2.x either, which would be a shame. > >>> > >>>> On Wed, Sep 25, 2019 at 8:33 PM Kaxil Naik <kaxiln...@gmail.com> > wrote: > >>>> > >>>> I agree with Dan to change the default execution at start of the > >>> interval. > >>>> > >>>> How about adding this for 2.0 ?? > >>>> > >>>> Don't want to keep delaying this if we have a consensus already. > >>>> > >>>> Regards, > >>>> Kaxil > >>>> > >>>> > >>>> On Fri, Aug 23, 2019, 15:39 Dan Davydov <ddavy...@twitter.com.invalid > > > >>>> wrote: > >>>> > >>>>> What are people's feelings on changing the default execution to > >>> schedule > >>>>> interval start and communicating this to existing users in the > Updating > >>>>> notes so that they can preserve the old behavior? Could potentially > >>> cause > >>>>> headaches for users who don't read the notes but I think it might > make > >>>>> sense to bite the bullet at some point for more intuitive behavior > >>>> overall > >>>>> for new users. > >>>>> > >>>>> On Fri, Aug 23, 2019 at 10:29 AM Dan Davydov <ddavy...@twitter.com> > >>>> wrote: > >>>>> > >>>>>> I am for this change, since I feel like in general the start of the > >>>>>> interval is more intuitive (I have been working on Airflow for 3 > >>> years > >>>>> and > >>>>>> this still trips me up). That being said I'm not sure how I feel > >>> about > >>>>>> allowing customization at DAG level instead of cluster level as it > >>>> makes > >>>>> it > >>>>>> harder to make assumptions about DAGs on the cluster for ops, though > >>>>> maybe > >>>>>> this isn't a huge deal given there are tools available that show you > >>>> why > >>>>>> tasks aren't running. > >>>>>> > >>>>>> I agree with Bole that we should communicate recommended migration > >>>>>> strategies if they can't be done automatically. > >>>>>> > >>>>>> I don't think I'm a fan for arbitrary customization of the interval > >>>> via a > >>>>>> callback, my feeling is this would not provide significant value and > >>>>> could > >>>>>> be an ops nightmare. > >>>>>> > >>>>>> On Fri, Aug 23, 2019 at 9:11 AM Jarek Potiuk < > >>> jarek.pot...@polidea.com > >>>>> > >>>>>> wrote: > >>>>>> > >>>>>>> DST: I recall problems with DST especially when the hour goes back > >>> and > >>>>> the > >>>>>>> daily schedule time technically occurs twice the same day or does > >>> not > >>>>>>> occur > >>>>>>> at all. We have some code that chooses arbitrary the first > occurence > >>>> in > >>>>>>> the > >>>>>>> latter case (there was a problem that it worked differently python > >>> 3.6 > >>>>> vs > >>>>>>> 3.5 (!). But also the case when we move forward is an interesting > >>>> one. I > >>>>>>> am > >>>>>>> not 100% it will work correctly after changing the scheduling > >>>> mechanisms > >>>>>>> but it's rather easy to test and there is no harm adding it. > >>>>>>> There is a DST-specific logic implemented in our next/previous run > >>>>>>> calculation and I imagine it could get wrong. > >>>>>>> > >>>>>>> The tests I am talking about: > >>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > DagTest.test_following_previous_schedule_daily_dag_CEST_to_CET/DagTest.test_following_previous_schedule_daily_dag_CET_to_CEST. > >>>>>>> > >>>>>>> Re: arbitrary customisation/converting DAGs. I think there is no > >>> need > >>>> to > >>>>>>> convert existing dags - the default behaviour remains as it is as > >>> far > >>>>> as I > >>>>>>> understand. And this flag is much simpler to understand and reason > >>>> about > >>>>>>> than arbitrary function and it corresponds to real business cases: > >>>>>>> > >>>>>>> 1) schedule_at_interval_end = True -> wait for the data to be ready > >>>> for > >>>>>>> the > >>>>>>> interval (current/default behaviour related to processing batches > of > >>>>> data) > >>>>>>> 2) schedule_at_interval_end = False -> CRON-like behaviour where we > >>>>> simply > >>>>>>> run arbitrary operation in regular intervals (more intuitive for > >>>> people > >>>>>>> who > >>>>>>> are used to CRON-like jobs) > >>>>>>> > >>>>>>> You can always build your schedule differently if you need > something > >>>>>>> "in-between" IMHO. > >>>>>>> > >>>>>>> J. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Fri, Aug 23, 2019 at 8:44 AM James Meickle > >>>>>>> <jmeic...@quantopian.com.invalid> wrote: > >>>>>>> > >>>>>>>> This is a change to one of Airflow's core concepts, and it would > >>>>>>> require a > >>>>>>>> lot of work for existing DAGs to cut over to it. Given that, my > >>>>> personal > >>>>>>>> preference would be to allow arbitrary customization rather than > >>>> just > >>>>> a > >>>>>>> bit > >>>>>>>> toggle. Such as allowing passing in a mapping function: given an > >>>>>>> interval's > >>>>>>>> start date and end date, when should it be executed? > >>>>>>>> > >>>>>>>> On Fri, Aug 23, 2019 at 8:24 AM Jarek Potiuk < > >>>>> jarek.pot...@polidea.com> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Happy for it as well. There are a number of cases where > >>> scheduling > >>>>> at > >>>>>>>> start > >>>>>>>>> makes more sense and as we see Airflow is used now in multiple > >>>> cases > >>>>>>>> where > >>>>>>>>> there is no need to process data from an interval and wait until > >>>>> that > >>>>>>>> data > >>>>>>>>> is ready. > >>>>>>>>> But indeed some more tests would be great - especially for edge > >>>>> cases. > >>>>>>>>> Changig mid-air is one but I think there should be test about > >>>>> Daylight > >>>>>>>>> Saving Time changing. > >>>>>>>>> There are some tests for DST so they just need to be extended to > >>>>> cover > >>>>>>>>> those two different cases. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> J. > >>>>>>>>> > >>>>>>>>> On Fri, Aug 23, 2019 at 7:37 AM Kaxil Naik <kaxiln...@gmail.com > >>>> > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Happy for this feature to merged > >>>>>>>>>> > >>>>>>>>>> On Fri, Aug 23, 2019, 11:49 Ash Berlin-Taylor <a...@apache.org > >>>> > >>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> This has come up a few times before, someone has now opened > >>> a > >>>> PR > >>>>>>> that > >>>>>>>>>>> makes this a global+per-dag setting: > >>>>>>>>>>> https://github.com/apache/airflow/pull/5787 and it also > >>>>> includes > >>>>>>>> docs > >>>>>>>>>>> that I think does a good job of illustrating the two modes. > >>>>>>>>>>> > >>>>>>>>>>> Does anyone object to this being merged? If no one says > >>>> anything > >>>>>>> by > >>>>>>>>>> midday > >>>>>>>>>>> on Tuesday I will take that as assent and will merge it. > >>>>>>>>>>> > >>>>>>>>>>> The docs from the PR included below. > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> Ash > >>>>>>>>>>> > >>>>>>>>>>> Scheduled Time vs Execution Time > >>>>>>>>>>> '''''''''''''''''''''''''''''''' > >>>>>>>>>>> > >>>>>>>>>>> A DAG with a ``schedule_interval`` will execute once per > >>>>>>> interval. By > >>>>>>>>>>> default, the execution of a DAG will occur at the **end** of > >>>> the > >>>>>>>>>>> schedule interval. > >>>>>>>>>>> > >>>>>>>>>>> A few examples: > >>>>>>>>>>> > >>>>>>>>>>> - A DAG with ``schedule_interval='@hourly'``: The DAG run > >>> that > >>>>>>>>> processes > >>>>>>>>>>> 2019-08-16 17:00 will start running just after 2019-08-16 > >>>>>>> 17:59:59, > >>>>>>>>>>> i.e. once that hour is over. > >>>>>>>>>>> - A DAG with ``schedule_interval='@daily'``: The DAG run > >>> that > >>>>>>>> processes > >>>>>>>>>>> 2019-08-16 will start running shortly after 2019-08-17 > >>> 00:00. > >>>>>>>>>>> > >>>>>>>>>>> The reasoning behind this execution vs scheduling behaviour > >>> is > >>>>>>> that > >>>>>>>>>>> data for the interval to be processed won't be fully > >>> available > >>>>>>> until > >>>>>>>>>>> the interval has elapsed. > >>>>>>>>>>> > >>>>>>>>>>> In cases where you wish the DAG to be executed at the > >>>> **start** > >>>>> of > >>>>>>>> the > >>>>>>>>>>> interval, specify ``schedule_at_interval_end=False``, either > >>>> in > >>>>>>>>>>> ``airflow.cfg``, or on a per-DAG basis. > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> > >>>>>>>>> Jarek Potiuk > >>>>>>>>> Polidea <https://www.polidea.com/> | Principal Software > >>> Engineer > >>>>>>>>> > >>>>>>>>> M: +48 660 796 129 <+48660796129> > >>>>>>>>> [image: Polidea] <https://www.polidea.com/> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> > >>>>>>> Jarek Potiuk > >>>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer > >>>>>>> > >>>>>>> M: +48 660 796 129 <+48660796129> > >>>>>>> [image: Polidea] <https://www.polidea.com/> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > > >