Respectfully, I strongly disagree with the renaming of Timetable to
Schedule. Schedule and Scheduler aren't meaningfully different, which can
lead to a lot of confusion. Even as a native English speaker, and someone
who works on Airflow full time, I routinely need to ask for clarification
about what schedule-related concept someone is referring to. I foresee
Schedule and Scheduler as two distinct yet closely related concepts
becoming a major source of confusion.

If folks dislike Timetable, we could certainly change to something else,
but let's not use something so similar to existing Airflow classes.

-James

On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <[email protected]> wrote:

> Summary of changes so far on the AIP:
>
> My proposed rename of DagRun.execution_date is now DagRun.schedule_date
> (previously I had proposed run_date. Thanks dstandish!)
>
> Timetable classes are renamed to Schedule classes (CronSchedule etc),
> similarly the DAG argument is now schedule (reminder: schedule_interval
> will not be removed or deprecated, and will still be the way to use
> "simple" expressions)
>
> -ash
>
> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <[email protected]> wrote:
>
> Could change Timetable To Schedule -- that would mean the DAG arg becomes
> `schedule=CronSchedule(...)` -- a bit close to the current
> `schedule_interval` but I think clear enough difference.
>
> I do like the name but my one worry with "schedule" is that Scheduler and
> Schedule are very similar, and might be be confused with each other for
> non-native English speakers? (I defer to others' judgment here, as this is
> not something I can experience myself.)
>
> @Kevin Yang <[email protected]> @Daniel Standish <[email protected]> any
> final input on this AIP?
>
>
>
> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <[email protected]> wrote:
>
> Hi Ash and all,
>
>
> What do people think of this? Worth it? Too complex to reason about what
>> context variables might exist as a result?
>
>
> I think I wouldn't worry about it right now or maybe not as part of this
> AIP. Currently, in one of the Github Issue, a user mentioned that it is not
> straightforward to know what is inside the context dictionary-
> https://github.com/apache/airflow/issues/14396. So maybe we can tackle
> this issue separately once the AbstractTimetable is built.
>
> Should "interval-less DAGs" (ones using "CronTimetable" in my proposal vs
>> "DataTimetable") have data_interval_start and end available in the context?
>
>
> hmm.. I would say No but then it contradicts my suggestion to remove
> context dict from this AIP. If we are going to use it in scheduler then
> yes, where data_interval_start = data_interval_end from CronTimetable.
>
> Does anyone have any better names than TimeDeltaTimetable, DataTimetable,
>> and CronTimetable? (We can probably change these names right up until
>> release, so not important to get this correct *now*.)
>
>
> No strong opinion here. Just an alternate suggestion can
> be TimeDeltaSchedule, DataSchedule and CronSchedule
>
>
>> Should I try to roll AIP-30
>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>  in
>> to this, or should we make that a future addition? (My vote is for future
>> addition)
>
>
> I would vote for Future addition too.
>
> Regards,
> Kaxil
>
> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <[email protected]> wrote:
>
>> I think, yes, AIP-35 or something like it would happily co-exist with
>> this proposal.
>>
>> @Daniel <[email protected]> and I have been discussing this a bit on
>> Slack, and one of the questions he asked was if the concept of
>> data_interval should be moved from DagRun as James and I suggested down on
>> to the individual task:
>>
>> suppose i have a new dag hitting 5 api endpoints and pulling data to s3.
>> suppose that yesterday 4 of them succeeded but one failed. today, 4 of them
>> should pull from yesterday. but the one that failed should pull from 2 days
>> back. so even though these normally have the same interval, today they
>> should not.
>>
>>
>> My view on this is two fold: one, this should primarily be handled by
>> retries on the task, and secondly, having different TaskIstances in the
>> same DagRun  have different data intervals would be much harder to reason
>> about/design the UI around, so for those reasons I still think interval
>> should be a DagRun-level concept.
>>
>> (He has a stalled AIP-30 where he proposed something to address this kind
>> of "watermark" case, which we might pick up next after this AIP is complete)
>>
>> One thing we might want to do is extend the interface of
>> AbstractTimetable to be able to add/update parameters in the context dict,
>> so the interface could become this:
>>
>> class AbstractTimetable(ABC):
>>     @abstractmethod
>>     def next_dagrun_info(
>>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>>
>>         session: Session,
>>     ) -> Optional[DagRunInfo]:
>>         """
>>         Get information about the next DagRun of this dag after
>> ``date_last_automated_dagrun`` -- the
>>         execution date, and the earliest it could be scheduled
>>
>>         :param date_last_automated_dagrun: The max(execution_date) of
>> existing
>>             "automated" DagRuns for this dag (scheduled or backfill, but
>> not
>>             manual)
>>         """
>>
>>     @abstractmethod
>>     def set_context_variables(self, dagrun: DagRun, context: Dict[str,
>> Any]) -> None:
>>         """
>>         Update or set new context variables to become available in task
>> templates and operators.
>>         """
>>
>>
>> What do people think of this? Worth it? Too complex to reason about what
>> context variables might exist as a result?
>>
>> *Outstanding question*:
>>
>>    - Should "interval-less DAGs" (ones using "CronTimetable" in my
>>    proposal vs "DataTimetable") have data_interval_start and end available in
>>    the context?
>>    - Does anyone have any better names than TimeDeltaTimetable,
>>    DataTimetable, and CronTimetable? (We can probably change these names 
>> right
>>    up until release, so not important to get this correct *now*.)
>>    - Should I try to roll AIP-30
>>    
>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>>    in to this, or should we make that a future addition? (My vote is for
>>    future addition)
>>
>>
>> I'd like to start voting on this AIP next week (probably on Tuesday) as I
>> think this will be a powerful feature that eases confusing to new users.
>>
>> -Ash
>>
>>
>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <[email protected]> wrote:
>>
>> Is this AIP going to co-exist with AIP-35 "Add Signal Based Scheduling To
>> Airflow"?
>> I think streaming was also discussed there (though it wasn't really the
>> use case).
>>
>>
>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <[email protected]>:
>>
>> Hi Kevin,
>>
>> Interesting idea. My original idea was actually for "interval-less DAGs"
>> (i.e. ones where it's just "run at this time") would not have
>> data_interval_start or end, but (while drafting the AIP) we decided that it
>> was probably "easier" if those values were always datetimes.
>>
>> That said, I think having the DB model have those values be nullable
>> would future proof it without needing another migration to change it. Do
>> you think this is worth doing now?
>>
>> I haven't (yet! It's on my list) spent any significant time thinking
>> about how to make Airflow play nicely with streaming jobs. If anyone else
>> has ideas here please share them
>>
>> -ash
>>
>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <[email protected]> wrote:
>>
>> Hi Ash and James,
>>
>> This is an exciting move. What do you think about using this opportunity
>> to extend Airflow's support to streaming like use cases? I.e DAGs/tasks
>> that want to run forever like a service. For such use cases, schedule
>> interval might not be meaningful, then do we want to make the date interval
>> param optional to DagRun and task instances? That sounds like a pretty
>> major change to the underlying model of Airflow, but this AIP is so far the
>> best opportunity I saw that can level up Airflow's support for
>> streaming/service use cases.
>>
>>
>> Cheers,
>> Kevin Y
>>
>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <[email protected]>
>> wrote:
>>
>> Very excited to see this proposal come through and love the direction
>> this has gone.
>>
>> Couple comments...
>>
>> *Tree view / Data completeness view*
>>
>> When you design your tasks with the canonical idempotence pattern, the
>> tree view shows you both data completeness and task execution history
>> (success / failure etc).
>>
>> When you don't use that pattern (which is my general preference), tree
>> view is only task execution history.
>>
>> This change has the potential to unlock a data completeness view for
>> canonical tasks.  It's possible that the "data completeness view" can
>> simply be the tree view.  I.e. somehow it can use these new classes to know
>> what data was successfully filled and what data wasn't.
>>
>> To the extent we like the idea of either extending / plugging / modifying
>> tree view, or adding a distinct data completeness view, we might want to
>> anticipate the needs of that in this change.  And maybe no alteration to
>> the proposal would be needed but just want to throw the idea out there.
>>
>> *Watermark workflow / incremental processing*
>>
>> A common pattern in data warehousing is pulling data incrementally from a
>> source.
>>
>> A standard way to achieve this is at the start of the task, select max
>> `updated_at` in source table and hold on to that value for a minute.  This
>> is your tentative new high watermark.
>> Now it's time to pull your data.  If your task ran before, grab last high
>> watermark.  If not, use initial load value.
>> If successful, update high watermark.
>>
>> On my team we implemented this with a stateful tasks / stateful processes
>> concept (there's a dormant draft AIP here
>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
>> and a WatermarkOperator that handled the boilerplate*.
>>
>> Again here, I don't have a specific suggestion at this moment.  But I
>> wanted to articulate this workflow because it is common and it wasn't
>> immediately obvious to me in reading AIP-39 how I would use it to implement
>> it.
>>
>> AIP-39 makes airflow more data-aware.  So if it can support this kind of
>> workflow that's great.  @Ash Berlin-Taylor <[email protected]> do you
>> have thoughts on how it might be compatible with this kind of thing as is?
>>
>> ---
>>
>> * The base operator is designed so that Subclasses only need to implement
>> two methods:
>>     - `get_high_watermark`: produce the tentative new high watermark
>>     ' `watermark_execute`: analogous to implementing poke in a sensor,
>> this is where your work is done. `execute` is left to the base class, and
>> it orchestrates (1) getting last high watermark or inital load value and
>> (2) updating new high watermark if job successful.
>>
>>

Reply via email to