This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 04bd99b481d Remove start_from_trigger tests and start_from_trigger is 
not part of 3.2 (#66766)
04bd99b481d is described below

commit 04bd99b481d9f7f1f5b8d2a296476cd628f52b20
Author: Rahul Vats <[email protected]>
AuthorDate: Fri May 15 13:21:23 2026 +0530

    Remove start_from_trigger tests and start_from_trigger is not part of 3.2 
(#66766)
---
 airflow-core/tests/unit/models/test_dagrun.py | 68 ---------------------------
 1 file changed, 68 deletions(-)

diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index b259b62552e..1703ba1023a 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -65,7 +65,6 @@ from airflow.serialization.definitions.deadline import 
SerializedReferenceModels
 from airflow.serialization.serialized_objects import LazyDeserializedDAG
 from airflow.settings import get_policy_plugin_manager
 from airflow.task.trigger_rule import TriggerRule
-from airflow.triggers.base import StartTriggerArgs
 from airflow.utils.session import create_session
 from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -2289,40 +2288,6 @@ def 
test_schedule_tis_only_one_scheduler_update_succeeds_when_competing(dag_make
     "(re-enabled on main by #55068, not backported); "
     "tracked at https://github.com/apache/airflow/issues/66307";
 )
[email protected]_serialized_dag
-def test_schedule_tis_start_trigger(dag_maker, session):
-    """
-    Test that an operator with start_trigger_args set can be directly deferred 
during scheduling.
-    """
-
-    class TestOperator(BaseOperator):
-        start_trigger_args = StartTriggerArgs(
-            trigger_cls="airflow.triggers.testing.SuccessTrigger",
-            trigger_kwargs=None,
-            next_method="execute_complete",
-            timeout=None,
-        )
-        start_from_trigger = True
-
-        def __init__(self, *args, **kwargs):
-            super().__init__(*args, **kwargs)
-            self.start_trigger_args.trigger_kwargs = {}
-
-        def execute_complete(self):
-            pass
-
-    with dag_maker(session=session):
-        TestOperator(task_id="test_task")
-
-    dr: DagRun = dag_maker.create_dagrun()
-    ti = dr.get_task_instance("test_task")
-    assert ti.state is None
-
-    ti.task = dr.dag.get_task("test_task")
-    dr.schedule_tis((ti,), session=session)
-    assert ti.state == TaskInstanceState.DEFERRED
-
-
 def test_schedule_tis_empty_operator_try_number(dag_maker, session: Session):
     """
     When empty operator is not actually run, then we need to increment the 
try_number,
@@ -2423,39 +2388,6 @@ def 
test_schedule_tis_try_number_check_is_debug_only(dag_maker, session: Session
     assert select_calls == 0
 
 
[email protected](reason="We can't keep this behaviour with remote workers 
where scheduler can't reach xcom")
-def test_schedule_tis_start_trigger_through_expand(dag_maker, session):
-    """
-    Test that an operator with start_trigger_args set can be directly deferred 
during scheduling.
-    """
-
-    class TestOperator(BaseOperator):
-        start_trigger_args = StartTriggerArgs(
-            trigger_cls="airflow.triggers.testing.SuccessTrigger",
-            trigger_kwargs={},
-            next_method="execute_complete",
-            timeout=None,
-        )
-        start_from_trigger = False
-
-        def __init__(self, *args, start_from_trigger: bool = False, **kwargs):
-            super().__init__(*args, **kwargs)
-            self.start_from_trigger = start_from_trigger
-
-        def execute_complete(self):
-            pass
-
-    with dag_maker(session=session):
-        
TestOperator.partial(task_id="test_task").expand(start_from_trigger=[True, 
False])
-
-    dr: DagRun = dag_maker.create_dagrun()
-
-    dr.schedule_tis(dr.task_instances, session=session)
-    tis = [(ti.state, ti.map_index) for ti in dr.task_instances]
-    assert tis[0] == (TaskInstanceState.DEFERRED, 0)
-    assert tis[1] == (None, 1)
-
-
 def test_mapped_expand_kwargs(dag_maker):
     with dag_maker():
 

Reply via email to