This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6229c1c98a Fix DateTimeOperator tests in Database Isolation Mode 
(#41271)
6229c1c98a is described below

commit 6229c1c98a2a78d00255d6b6c5b70d032cff9b80
Author: Jens Scheffler <95105677+jsche...@users.noreply.github.com>
AuthorDate: Tue Aug 6 09:17:09 2024 +0200

    Fix DateTimeOperator tests in Database Isolation Mode (#41271)
    
    * Fix DateTimeOperator tests in Database Isolation Mode
    
    * Review Feedback
---
 tests/operators/test_datetime.py | 41 ++++++++++++++++++++--------------------
 1 file changed, 20 insertions(+), 21 deletions(-)

diff --git a/tests/operators/test_datetime.py b/tests/operators/test_datetime.py
index 485897acc6..e4294a8856 100644
--- a/tests/operators/test_datetime.py
+++ b/tests/operators/test_datetime.py
@@ -23,7 +23,6 @@ import pytest
 import time_machine
 
 from airflow.exceptions import AirflowException
-from airflow.models.dag import DAG
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance as TI
 from airflow.operators.datetime import BranchDateTimeOperator
@@ -52,30 +51,30 @@ class TestBranchDateTimeOperator:
         (datetime.time(10, 0, 0), datetime.datetime(2020, 7, 7, 11, 0, 0)),
     ]
 
-    def setup_method(self):
-        self.dag = DAG(
+    @pytest.fixture(autouse=True)
+    def base_tests_setup(self, dag_maker):
+        with dag_maker(
             "branch_datetime_operator_test",
             default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
             schedule=INTERVAL,
-        )
-
-        self.branch_1 = EmptyOperator(task_id="branch_1", dag=self.dag)
-        self.branch_2 = EmptyOperator(task_id="branch_2", dag=self.dag)
+            serialized=True,
+        ) as dag:
+            self.dag = dag
+            self.branch_1 = EmptyOperator(task_id="branch_1")
+            self.branch_2 = EmptyOperator(task_id="branch_2")
 
-        self.branch_op = BranchDateTimeOperator(
-            task_id="datetime_branch",
-            follow_task_ids_if_true="branch_1",
-            follow_task_ids_if_false="branch_2",
-            target_upper=datetime.datetime(2020, 7, 7, 11, 0, 0),
-            target_lower=datetime.datetime(2020, 7, 7, 10, 0, 0),
-            dag=self.dag,
-        )
+            self.branch_op = BranchDateTimeOperator(
+                task_id="datetime_branch",
+                follow_task_ids_if_true="branch_1",
+                follow_task_ids_if_false="branch_2",
+                target_upper=datetime.datetime(2020, 7, 7, 11, 0, 0),
+                target_lower=datetime.datetime(2020, 7, 7, 10, 0, 0),
+            )
 
-        self.branch_1.set_upstream(self.branch_op)
-        self.branch_2.set_upstream(self.branch_op)
-        self.dag.clear()
+            self.branch_1.set_upstream(self.branch_op)
+            self.branch_2.set_upstream(self.branch_op)
 
-        self.dr = self.dag.create_dagrun(
+        self.dr = dag_maker.create_dagrun(
             run_id="manual__",
             start_date=DEFAULT_DATE,
             execution_date=DEFAULT_DATE,
@@ -231,11 +230,11 @@ class TestBranchDateTimeOperator:
         targets,
     )
     @time_machine.travel("2020-12-01 09:00:00")
-    def test_branch_datetime_operator_use_task_logical_date(self, 
target_lower, target_upper):
+    def test_branch_datetime_operator_use_task_logical_date(self, dag_maker, 
target_lower, target_upper):
         """Check if BranchDateTimeOperator uses task execution date"""
         in_between_date = timezone.datetime(2020, 7, 7, 10, 30, 0)
         self.branch_op.use_task_logical_date = True
-        self.dr = self.dag.create_dagrun(
+        self.dr = dag_maker.create_dagrun(
             run_id="manual_exec_date__",
             start_date=in_between_date,
             execution_date=in_between_date,

Reply via email to