kyungjunleeme commented on code in PR #53455:
URL: https://github.com/apache/airflow/pull/53455#discussion_r2213565230


##########
providers/standard/tests/unit/standard/utils/test_skipmixin.py:
##########
@@ -359,3 +359,84 @@ def 
test_raise_exception_on_not_valid_branch_task_ids(self, dag_maker, branch_ta
         error_message = r"'branch_task_ids' must contain only valid task_ids. 
Invalid tasks found: .*"
         with pytest.raises(AirflowException, match=error_message):
             SkipMixin().skip_all_except(ti=ti1, 
branch_task_ids=branch_task_ids)
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Issue only exists in 
Airflow 3.x")
+    def test_ensure_tasks_includes_sensors_airflow_3x(self, dag_maker):
+        """Test that sensors (inheriting from airflow.sdk.BaseOperator) are 
properly handled by _ensure_tasks."""
+        from airflow.providers.standard.utils.skipmixin import _ensure_tasks
+        from airflow.sdk import BaseOperator as SDKBaseOperator
+        from airflow.sdk.bases.sensor import BaseSensorOperator
+
+        class DummySensor(BaseSensorOperator):
+            def __init__(self, **kwargs):
+                super().__init__(**kwargs)
+                self.timeout = 0
+                self.poke_interval = 0
+
+            def poke(self, context):
+                return True
+
+        with dag_maker("dag_test_sensor_skipping", serialized=True) as dag:
+            regular_task = EmptyOperator(task_id="regular_task")
+            sensor_task = DummySensor(task_id="sensor_task")
+            downstream_task = EmptyOperator(task_id="downstream_task")
+
+            regular_task >> [sensor_task, downstream_task]
+
+        dag_maker.create_dagrun(run_id=DEFAULT_DAG_RUN_ID)
+
+        downstream_nodes = dag.get_task("regular_task").downstream_list
+        task_list = _ensure_tasks(downstream_nodes)
+
+        # Verify both the regular operator and sensor are included
+        task_ids = [t.task_id for t in task_list]
+        assert "sensor_task" in task_ids, "Sensor should be included in task 
list"
+        assert "downstream_task" in task_ids, "Regular task should be included 
in task list"
+        assert len(task_list) == 2, "Both tasks should be included"
+
+        # Also verify that the sensor is actually an instance of the correct 
BaseOperator
+        sensor_in_list = next((t for t in task_list if t.task_id == 
"sensor_task"), None)
+        assert sensor_in_list is not None, "Sensor task should be found in 
list"
+        assert isinstance(sensor_in_list, SDKBaseOperator), "Sensor should be 
instance of SDK BaseOperator"
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Integration test for 
Airflow 3.x sensor skipping")
+    def test_skip_sensor_in_branching_scenario(self, dag_maker):
+        """Integration test: verify sensors are properly skipped by branching 
operators in Airflow 3.x."""
+        from airflow.sdk.bases.sensor import BaseSensorOperator
+
+        # Create a dummy sensor for testing
+        class DummySensor(BaseSensorOperator):
+            def __init__(self, **kwargs):
+                super().__init__(**kwargs)
+                self.timeout = 0
+                self.poke_interval = 0
+
+            def poke(self, context):
+                return True
+
+        with dag_maker("dag_test_branch_sensor_skipping", serialized=True):

Review Comment:
   ```suggestion
           with dag_maker("dag_test_branch_sensor_skipping"):
   ```
   
   you have to remove serialize=True. because. `_ensure_tasks()` expects 
instances of BaseOperator, so it does not work properly with serialized DAGs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to