BasPH opened a new issue #21471: URL: https://github.com/apache/airflow/issues/21471
### Apache Airflow version 2.2.3 (latest released) ### What happened Was testing custom Timetables and found that scheduled task instances fail in Airflow 2.2.3, but succeed prior to 2.2.3. Observed behaviour: - Have a DAG with custom timetable and `catchup=True` - First tried my own Timetable, ended up copy-pasting the [example Timetable in the docs](https://airflow.apache.org/docs/apache-airflow/stable/howto/timetable.html), but either way all scheduled task instances fail on me. - Manually triggered runs work fine. - DAG run info (start/end date) is accurate - But task instances default to failed, without running - No errors in any logfile - See screenshot, all failed TIs are scheduled, the succeeded TI was manual. Hovering over the DAG run shows the correct start/end datetime, but the TIs are empty. - In the UI, TIs go from queued -> failed. No logs are displayed, there are also no logs files for those TIs on disk. - In the DB, the scheduled TIs have no start_date, the manual TIs do have a start_date.  ### What you expected to happen I expect a custom timetable to work for scheduled intervals. ### How to reproduce Run the Timetable in the docs on Airflow 2.2.3: ```python from datetime import timedelta from typing import Optional from airflow.plugins_manager import AirflowPlugin from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable from pendulum import Date, DateTime, Time, timezone UTC = timezone("UTC") class AfterWorkdayTimetable(Timetable): def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval: weekday = run_after.weekday() if weekday in (0, 6): # Monday and Sunday -- interval is last Friday. days_since_friday = (run_after.weekday() - 4) % 7 delta = timedelta(days=days_since_friday) else: # Otherwise the interval is yesterday. delta = timedelta(days=1) start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC) return DataInterval(start=start, end=(start + timedelta(days=1))) def next_dagrun_info( self, *, last_automated_data_interval: Optional[DataInterval], restriction: TimeRestriction, ) -> Optional[DagRunInfo]: if last_automated_data_interval is not None: # There was a previous run on the regular schedule. last_start = last_automated_data_interval.start last_start_weekday = last_start.weekday() if 0 <= last_start_weekday < 4: # Last run on Monday through Thursday -- next is tomorrow. delta = timedelta(days=1) else: # Last run on Friday -- skip to next Monday. delta = timedelta(days=(7 - last_start_weekday)) next_start = DateTime.combine((last_start + delta).date(), Time.min).replace(tzinfo=UTC) else: # This is the first ever run on the regular schedule. next_start = restriction.earliest if next_start is None: # No start_date. Don't schedule. return None if not restriction.catchup: # If the DAG has catchup=False, today is the earliest to consider. next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) elif next_start.time() != Time.min: # If earliest does not fall on midnight, skip to the next day. next_day = next_start.date() + timedelta(days=1) next_start = DateTime.combine(next_day, Time.min).replace(tzinfo=UTC) next_start_weekday = next_start.weekday() if next_start_weekday in (5, 6): # If next start is in the weekend, go to next Monday. delta = timedelta(days=(7 - next_start_weekday)) next_start = next_start + delta if restriction.latest is not None and next_start > restriction.latest: return None # Over the DAG's scheduled end; don't schedule. return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1))) class WorkdayTimetablePlugin(AirflowPlugin): name = "workday_timetable_plugin" timetables = [AfterWorkdayTimetable] ``` DAG: ```python import datetime from airflow import DAG from airflow.operators.python import PythonOperator from after_workday_timetable import AfterWorkdayTimetable def print_interval_info(data_interval_start, data_interval_end, **context): print("===================================================") print(f"START = {data_interval_start} ({data_interval_start.format('dddd')})") print(f"END = {data_interval_end} ({data_interval_end.format('dddd')})") print(context) print("===================================================") with DAG( dag_id="timetable_demo", start_date=datetime.datetime(2022, 1, 1), timetable=AfterWorkdayTimetable(), catchup=True, ) as dag: test = PythonOperator(task_id="test", python_callable=print_interval_info) ``` ### Operating System Debian Bullseye ### Versions of Apache Airflow Providers apache-airflow-providers-amazon==2.4.0 apache-airflow-providers-celery==2.1.0 apache-airflow-providers-cncf-kubernetes==2.2.0 apache-airflow-providers-ftp==2.0.1 apache-airflow-providers-http==2.0.1 apache-airflow-providers-imap==2.0.1 apache-airflow-providers-postgres==2.4.0 apache-airflow-providers-redis==2.0.1 apache-airflow-providers-sqlite==2.0.1 ### Deployment Docker-Compose ### Deployment details _No response_ ### Anything else _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
