This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 562357528d Refactor unneeded 'continue' jumps in dag processing (#33839) 562357528d is described below commit 562357528d85a1d1a3054b2f7b88511e1c5337f4 Author: Miroslav Šedivý <6774676+eum...@users.noreply.github.com> AuthorDate: Sun Sep 3 15:59:19 2023 +0000 Refactor unneeded 'continue' jumps in dag processing (#33839) --- airflow/dag_processing/manager.py | 27 +++++++++++---------------- airflow/dag_processing/processor.py | 4 ++-- airflow/example_dags/plugins/workday.py | 17 ++++++++--------- 3 files changed, 21 insertions(+), 27 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 3b26da2c68..be5de2ccb7 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -610,16 +610,12 @@ class DagFileProcessorManager(LoggingMixin): continue for sentinel in ready: - if sentinel is self._direct_scheduler_conn: - continue - - processor = self.waitables.get(sentinel) - if not processor: - continue - - self._collect_results_from_processor(processor) - self.waitables.pop(sentinel) - self._processors.pop(processor.file_path) + if sentinel is not self._direct_scheduler_conn: + processor = self.waitables.get(sentinel) + if processor: + self._collect_results_from_processor(processor) + self.waitables.pop(sentinel) + self._processors.pop(processor.file_path) if self.standalone_dag_processor: self._fetch_callbacks(max_callbacks_per_loop) @@ -1058,12 +1054,11 @@ class DagFileProcessorManager(LoggingMixin): ) for sentinel in ready: - if sentinel is self._direct_scheduler_conn: - continue - processor = cast(DagFileProcessorProcess, self.waitables[sentinel]) - self.waitables.pop(processor.waitable_handle) - self._processors.pop(processor.file_path) - self._collect_results_from_processor(processor) + if sentinel is not self._direct_scheduler_conn: + processor = cast(DagFileProcessorProcess, self.waitables[sentinel]) + self.waitables.pop(processor.waitable_handle) + self._processors.pop(processor.file_path) + self._collect_results_from_processor(processor) self.log.debug("%s/%s DAG parsing processes running", len(self._processors), self._parallelism) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 1cb9c74a27..e452dc4259 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -561,8 +561,8 @@ class DagFileProcessor(LoggingMixin): cls.logger().warning( "Task %s doesn't exist in DAG anymore, skipping SLA miss notification.", sla.task_id ) - continue - tasks_missed_sla.append(task) + else: + tasks_missed_sla.append(task) emails: set[str] = set() for task in tasks_missed_sla: diff --git a/airflow/example_dags/plugins/workday.py b/airflow/example_dags/plugins/workday.py index 79473e06dd..2c3299c960 100644 --- a/airflow/example_dags/plugins/workday.py +++ b/airflow/example_dags/plugins/workday.py @@ -45,15 +45,14 @@ class AfterWorkdayTimetable(Timetable): def get_next_workday(self, d: DateTime, incr=1) -> DateTime: next_start = d while True: - if next_start.weekday() in (5, 6): # If next start is in the weekend go to next day - next_start = next_start + incr * timedelta(days=1) - continue - if holiday_calendar is not None: - holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime() - if next_start in holidays: # If next start is a holiday go to next day - next_start = next_start + incr * timedelta(days=1) - continue - break + if next_start.weekday() not in (5, 6): # not on weekend + if holiday_calendar is None: + holidays = set() + else: + holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime() + if next_start not in holidays: + break + next_start = next_start.add(days=incr) return next_start # [START howto_timetable_infer_manual_data_interval]