This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 562357528d Refactor unneeded 'continue' jumps in dag processing 
(#33839)
562357528d is described below

commit 562357528d85a1d1a3054b2f7b88511e1c5337f4
Author: Miroslav Šedivý <6774676+eum...@users.noreply.github.com>
AuthorDate: Sun Sep 3 15:59:19 2023 +0000

    Refactor unneeded 'continue' jumps in dag processing (#33839)
---
 airflow/dag_processing/manager.py       | 27 +++++++++++----------------
 airflow/dag_processing/processor.py     |  4 ++--
 airflow/example_dags/plugins/workday.py | 17 ++++++++---------
 3 files changed, 21 insertions(+), 27 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 3b26da2c68..be5de2ccb7 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -610,16 +610,12 @@ class DagFileProcessorManager(LoggingMixin):
                 continue
 
             for sentinel in ready:
-                if sentinel is self._direct_scheduler_conn:
-                    continue
-
-                processor = self.waitables.get(sentinel)
-                if not processor:
-                    continue
-
-                self._collect_results_from_processor(processor)
-                self.waitables.pop(sentinel)
-                self._processors.pop(processor.file_path)
+                if sentinel is not self._direct_scheduler_conn:
+                    processor = self.waitables.get(sentinel)
+                    if processor:
+                        self._collect_results_from_processor(processor)
+                        self.waitables.pop(sentinel)
+                        self._processors.pop(processor.file_path)
 
             if self.standalone_dag_processor:
                 self._fetch_callbacks(max_callbacks_per_loop)
@@ -1058,12 +1054,11 @@ class DagFileProcessorManager(LoggingMixin):
         )
 
         for sentinel in ready:
-            if sentinel is self._direct_scheduler_conn:
-                continue
-            processor = cast(DagFileProcessorProcess, self.waitables[sentinel])
-            self.waitables.pop(processor.waitable_handle)
-            self._processors.pop(processor.file_path)
-            self._collect_results_from_processor(processor)
+            if sentinel is not self._direct_scheduler_conn:
+                processor = cast(DagFileProcessorProcess, 
self.waitables[sentinel])
+                self.waitables.pop(processor.waitable_handle)
+                self._processors.pop(processor.file_path)
+                self._collect_results_from_processor(processor)
 
         self.log.debug("%s/%s DAG parsing processes running", 
len(self._processors), self._parallelism)
 
diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index 1cb9c74a27..e452dc4259 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -561,8 +561,8 @@ class DagFileProcessor(LoggingMixin):
                     cls.logger().warning(
                         "Task %s doesn't exist in DAG anymore, skipping SLA 
miss notification.", sla.task_id
                     )
-                    continue
-                tasks_missed_sla.append(task)
+                else:
+                    tasks_missed_sla.append(task)
 
             emails: set[str] = set()
             for task in tasks_missed_sla:
diff --git a/airflow/example_dags/plugins/workday.py 
b/airflow/example_dags/plugins/workday.py
index 79473e06dd..2c3299c960 100644
--- a/airflow/example_dags/plugins/workday.py
+++ b/airflow/example_dags/plugins/workday.py
@@ -45,15 +45,14 @@ class AfterWorkdayTimetable(Timetable):
     def get_next_workday(self, d: DateTime, incr=1) -> DateTime:
         next_start = d
         while True:
-            if next_start.weekday() in (5, 6):  # If next start is in the 
weekend go to next day
-                next_start = next_start + incr * timedelta(days=1)
-                continue
-            if holiday_calendar is not None:
-                holidays = holiday_calendar.holidays(start=next_start, 
end=next_start).to_pydatetime()
-                if next_start in holidays:  # If next start is a holiday go to 
next day
-                    next_start = next_start + incr * timedelta(days=1)
-                    continue
-            break
+            if next_start.weekday() not in (5, 6):  # not on weekend
+                if holiday_calendar is None:
+                    holidays = set()
+                else:
+                    holidays = holiday_calendar.holidays(start=next_start, 
end=next_start).to_pydatetime()
+                if next_start not in holidays:
+                    break
+            next_start = next_start.add(days=incr)
         return next_start
 
     # [START howto_timetable_infer_manual_data_interval]

Reply via email to