My understanding here is that some users just wants to trigger a dag at the
time they prefer rather than always at the end of schedule_interval? I saw
two use cases in the thread:
1. The usecase from Daniel wants to trigger at start of schedule_interval
2. The usecase from Shaw has some holidays to skip

For both usecases, has anyone considered setting schedule_interval to None
and use a TriggerDagRunOperator to trigger the DAG instead? If you add the
following entry to the [scheduler] section of airflow.cfg, you can
literally trigger the DAG at any time or interval using any logic you like.

# Allow externally triggered DagRuns for Execution Dates in the future
# Only has effect if schedule_interval is set to None in DAG
# We set this to True so that tasks on the DagRun with execution_date T can
start running before midnight in UTC on date T.
allow_trigger_in_future = True


Thanks,
Qian

On Mon, May 18, 2020 at 1:07 AM Bas Harenslak
<basharens...@godatadriven.com.invalid> wrote:

> My 2c: adding an option to schedule at the start of an interval is yet
> another option to take in, adding more complexity. Therefore I’m not in
> favour of it.
>
> The scheduling at start/end has often been a discussed, IMO it's a fact
> which one has to know when learning Airflow. But not something that is
> either good or bad, therefore changing it would introduce confusion.
>
> What is bad IMO is adding more options to change the default behaviour,
> and make Airflow even more complex to comprehend. The core functionality
> should be as simple as possible.
>
> Besides the start/end discussion, @Damian, the scheduling you described
> there seems like a valid use case. I’m thinking something like a
> “scheduling blacklist” would be a nice feature here. Could you create a
> GitHub issue for that?
>
> Bas
>
> On 11 May 2020, at 18:17, Shaw, Damian P. <damian.sha...@credit-suisse.com
> <mailto:damian.sha...@credit-suisse.com>> wrote:
>
>
> I strongly agree with reducing Airflows complexity and the plan to
> serialize DAGs by default makes a lot of sense. So wouldn't it make sense
> to also serialize the schedule of a DAG?
> e.g. Provide a table of execution dates that the scheduler can keep
> upcoming ones cached and periodically check for update, and give the user a
> way to update it if they need to.
>
> Outside customization Airflows current schedule definition solution is
> fairly weak.
>
> For example in the very common use case of following a countries working
> day calendar it's actually pretty tricky for a new user to implement in
> Airflow. Say you want to follow a fairly normal east Asian Calendar, the
> current way recommended way to do this in Airflow is the following:
>
> 1. Create a DAG schedule Monday - Friday
> 2. Write a Holiday Branch / Short Circuit Operator to skip tasks on a
> Holiday, put inside your own custom Airflow Operators package and
> distribute with every Airflow instance
> 3. Write a function to calculate the actual "{{ next_ds }}" as the one
> Airflow provides is normalized to UTC and therefore is usually 1 day out
> for DAG aware timezone in Asia, put it in the Macro Plugins
> 4. Write functions that can calculate the actual "{{ prev_ds }}" and "{{
> ds }}" based on skipping Holidays for that calendar and the timezone the
> DAG is in, put them in the Macro Plugins
> 5. For every DAG make sure you correctly upstream the Holiday Branch
> Operator and be very careful to only ever use the custom datetime macros
> that you've writen
>
> And that is the most simple use case, in the real world you also need to
> deal with weekends that fall on different days, "reverse holiday weekends"
> where days that are usually a weekend become a working day (a real thing in
> India), schedules that follow things like "the 2nd Monday after Easter" or
> "every 15 minutes between 9am and 5am on a working day and every hour
> outside that time".  I've handled all such situations but I've really felt
> like I've had to fight to get what I need out of Airflow, which in
> retrospect is odd for a tool that is designed to schedule workflows.
>
> Anyway my 2 cents as an Airflow user.
>
> Regards
> Damian
>
> -----Original Message-----
> From: Dan Davydov <ddavy...@twitter.com.INVALID<mailto:
> ddavy...@twitter.com.INVALID>>
> Sent: Monday, May 11, 2020 11:22
> To: dev@airflow.apache.org<mailto:dev@airflow.apache.org>
> Subject: Re: Setting to add choice of schedule at end or schedule at start
> of interval
>
> I strongly agree with Ash, I also think we should strive to decrease the
> complexity of core Airflow components and not offer
> customization/extensibility especially in the form of plugins where it is
> not needed to make Airflow more robust and easier to reason about (less
> testing configuration). I think there is some potential one-time migration
> pain we need to go through here but would prefer a simpler solution here
> (something similar to the original github PR that was raised to resolve
> this issue, having trouble finding it at the moment), with other things
> like data lineage being addressed in separate features to keep the scope
> down.
>
> On Mon, May 11, 2020 at 6:15 AM Ash Berlin-Taylor <a...@apache.org<mailto:
> a...@apache.org>> wrote:
>
> Ash, you had mentioned something about some plans that were in
> conflict with the above hack.... could you maybe share a thought or
> two about what you were thinking?
>
>
> The main thing here is around scheduler HA, and I want the scheduler
> to be able to make all scheduling decisions without needing to load
> any user-provided code.
>
> i.e. that A "dag serializer" process can be the only thing in the
> scheduler that loads/executes user code, and all other scheduling
> decisions operate on the (de)serialized version..
>
> -ash
>
> On May 6 2020, at 7:21 pm, Daniel Standish <dpstand...@gmail.com<mailto:
> dpstand...@gmail.com>> wrote:
>
> Inspired by James, I tried this out...
>
> For others interested, here is sample dag to test it out:
>
> class MyDAG(DAG):
>   def following_schedule(self, dttm):
>       pen_dt = pendulum.instance(dttm).replace(second=0, microsecond=0)
>       minutes = pen_dt.minute
>       minutes_mod = minutes % 10
>       if minutes_mod < 5:
>           return pen_dt.add(minutes=1)
>       else:
>           return pen_dt.add(minutes=10 - minutes_mod)
>
>   def previous_schedule(self, dttm):
>       pen_dt = pendulum.instance(dttm).replace(second=0, microsecond=0)
>       minutes = pen_dt.minute
>       minutes_mod = minutes % 10
>       if minutes_mod < 5:
>           return pen_dt.add(minutes=-1)
>       else:
>           return pen_dt.add(minutes=-(minutes_mod - 5))
>
>
> dag = MyDAG(
>   dag_id='test_schd',
>   default_args=default_args,
>   schedule_interval='@daily',
>   catchup=True,
>   concurrency=1000,
>   max_active_runs=10,
> )
>
>
> with dag:
>   DummyOperator(task_id='test', task_concurrency=1000)
>
> What this will do is trigger one run for every minute when minutes
> (mod 10) is between 0-4 but not schedule anything between 5-9 *(or
> something like that, i did not scrutinize the edges carefully)*.
>
> But...  anyway, it'll prove that it works relatively quickly and
> that's the point.
>
> I have a use case.  I think i might use it, rather than adding
> branching logic.  It's ugly, but they are both ugly.
>
> *Question*
>
> What do people think about a Schedule abstraction that takes the
> previous_schedule and following_schedule methods from dag (perhaps
> rename to previous_execution following_execution?)
>
> Then I imagine we could do this:
> * deprecate the `schedule_interval` param
> * rename schedule_interval to ``schedule: Union[Schedule, str,
> timedelta]``
> and preserve backward compatibility
> * if str or timedelta is given, we instantiate a suitable Schedule
> object.
> Perhaps there is a CronSchedule and a TimedeltaSchedule.
>
> Any interest?
>
> Ash, you had mentioned something about some plans that were in
> conflict with the above hack.... could you maybe share a thought or
> two about what you were thinking?
>
> *Another idea*
>
> If we could maybe leave it to the `Schedule` class to decide the
> relationship between run time and "execution_date".  There is the
> "interval
> edge PR"...  But maybe there would be an elegant way to control that
> behavior in the Schedule class.  Perhaps simply a class constant, or
> param.
>
>
>
>
>
>
> ===============================================================================
> Please access the attached hyperlink for an important electronic
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>
> ===============================================================================
>
>

Reply via email to