This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b66bf510ab2 Remove further findings from positional session check 
(#67712)
b66bf510ab2 is described below

commit b66bf510ab2f1e7f6e407ec75233acae81c48d0c
Author: Jens Scheffler <[email protected]>
AuthorDate: Fri May 29 21:12:02 2026 +0200

    Remove further findings from positional session check (#67712)
    
    * Remove further findings from positional session check
    
    * Fix pytests
---
 .../airflow/providers/amazon/aws/triggers/emr.py   |  2 +-
 .../kubernetes/executors/kubernetes_executor.py    |  1 +
 .../cncf/kubernetes/template_rendering.py          |  3 +-
 .../providers/cncf/kubernetes/triggers/pod.py      |  2 +-
 .../unit/common/ai/plugins/test_hitl_review.py     | 12 +++----
 .../databricks/plugins/databricks_workflow.py      |  4 +--
 .../databricks/plugins/test_databricks_workflow.py |  2 +-
 .../cli_commands/permissions_command.py            |  4 +--
 .../cli_commands/test_permissions_command.py       | 10 +++---
 .../providers/google/cloud/triggers/bigquery.py    |  2 +-
 .../providers/google/cloud/triggers/dataproc.py    |  6 ++--
 .../airflow/providers/openlineage/utils/utils.py   |  2 +-
 .../providers/standard/operators/trigger_dagrun.py |  2 +-
 .../providers/standard/sensors/external_task.py    |  6 ++--
 .../providers/standard/utils/sensor_helper.py      |  1 +
 .../standard/sensors/test_external_task_sensor.py  | 42 +++++++++++-----------
 .../ci/prek/known_provide_session_positional.txt   | 14 --------
 17 files changed, 54 insertions(+), 61 deletions(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py 
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py
index 3c4614be857..c98641b0c6b 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py
@@ -419,7 +419,7 @@ class EmrServerlessStartJobTrigger(AwsBaseWaiterTrigger):
     if not AIRFLOW_V_3_0_PLUS:
 
         @provide_session
-        def get_task_instance(self, session: Session) -> TaskInstance:
+        def get_task_instance(self, *, session: Session) -> TaskInstance:
             """Get the task instance for the current trigger (Airflow 2.x 
compatibility)."""
             from sqlalchemy import select
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index d109ac097dd..481960b71d2 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -402,6 +402,7 @@ class KubernetesExecutor(BaseExecutor):
     def _change_state(
         self,
         results: KubernetesResults,
+        *,
         session: Session = NEW_SESSION,
     ) -> None:
         """Change state of the task based on KubernetesResults."""
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
index ed9a265728c..6920e8400f2 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
@@ -85,8 +85,9 @@ def render_k8s_pod_yaml(task_instance: TaskInstance) -> dict 
| None:
     return sanitized_pod
 
 
+# Note: AIRFLOW_V_3_0_PLUS - this is only needed for Airflow 2.x, can be 
removed if the support is dropped
 @provide_session
-def get_rendered_k8s_spec(task_instance: TaskInstance, session=NEW_SESSION) -> 
dict | None:
+def get_rendered_k8s_spec(task_instance: TaskInstance, *, session=NEW_SESSION) 
-> dict | None:
     """Fetch rendered template fields from DB."""
     from airflow.models.renderedtifields import RenderedTaskInstanceFields
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index 65b1b45bb04..207e97280a5 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -366,7 +366,7 @@ class KubernetesPodTrigger(BaseTrigger):
     if not AIRFLOW_V_3_0_PLUS:
 
         @provide_session
-        def get_task_instance(self, session: Session) -> TaskInstance:
+        def get_task_instance(self, *, session: Session) -> TaskInstance:
             """Get the task instance for this trigger from the database 
(Airflow 2.x only)."""
             task_instance = session.scalar(
                 select(TaskInstance).where(
diff --git 
a/providers/common/ai/tests/unit/common/ai/plugins/test_hitl_review.py 
b/providers/common/ai/tests/unit/common/ai/plugins/test_hitl_review.py
index 1d5ba3c83c1..1f10af77835 100644
--- a/providers/common/ai/tests/unit/common/ai/plugins/test_hitl_review.py
+++ b/providers/common/ai/tests/unit/common/ai/plugins/test_hitl_review.py
@@ -88,7 +88,6 @@ def _clear_db():
 
 @provide_session
 def _create_hitl_session(
-    session=None,
     *,
     dag_id=TEST_DAG_ID,
     run_id=TEST_RUN_ID,
@@ -98,6 +97,7 @@ def _create_hitl_session(
     iteration=1,
     prompt="Summarize",
     current_output="Initial output",
+    session=None,
 ):
     """Create HITL session and output XCom entries in the database."""
     sess = AgentSessionData(
@@ -730,11 +730,11 @@ class TestBuildSessionResponse:
         dag_maker.create_dagrun(run_id=TEST_RUN_ID, 
run_type=DagRunType.MANUAL, logical_date=logical_date)
         dag_maker.sync_dagbag_to_db()
         _create_hitl_session(
-            session=session,
             status=status,
             iteration=iteration,
             prompt=prompt,
             current_output=current_output,
+            session=session,
         )
         session.commit()
 
@@ -757,11 +757,11 @@ class TestBuildSessionResponse:
         dag_maker.create_dagrun(run_id=TEST_RUN_ID, 
run_type=DagRunType.MANUAL, logical_date=logical_date)
         dag_maker.sync_dagbag_to_db()
         _create_hitl_session(
-            session=session,
             status=SessionStatus.PENDING_REVIEW,
             iteration=1,
             prompt="Summarize",
             current_output="First output",
+            session=session,
         )
         session.commit()
 
@@ -785,11 +785,11 @@ class TestBuildSessionResponse:
         dag_maker.create_dagrun(run_id=TEST_RUN_ID, 
run_type=DagRunType.MANUAL, logical_date=logical_date)
         dag_maker.sync_dagbag_to_db()
         _create_hitl_session(
-            session=session,
             status=SessionStatus.PENDING_REVIEW,
             iteration=1,
             prompt="p",
             current_output="Initial",
+            session=session,
         )
         XComModel.set(
             key=f"{XCOM_HUMAN_FEEDBACK_PREFIX}1",
@@ -986,11 +986,11 @@ class TestFindSessionEndpoint:
     def test_returns_max_iterations_exceeded_status(self, test_client, 
session, dag_maker):
         """Find endpoint returns max_iterations_exceeded when session has that 
status."""
         _create_hitl_session(
-            session=session,
             status=SessionStatus.MAX_ITERATIONS_EXCEEDED,
             iteration=5,
             prompt="p",
             current_output="output",
+            session=session,
         )
         session.commit()
 
@@ -1012,11 +1012,11 @@ class TestFindSessionEndpoint:
     def test_returns_timeout_exceeded_status(self, test_client, session, 
dag_maker):
         """Find endpoint returns timeout_exceeded when session has that 
status."""
         _create_hitl_session(
-            session=session,
             status=SessionStatus.TIMEOUT_EXCEEDED,
             iteration=2,
             prompt="p",
             current_output="output",
+            session=session,
         )
         session.commit()
 
diff --git 
a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py
 
b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py
index d50acc800e7..756bbf6bdee 100644
--- 
a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py
+++ 
b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py
@@ -164,7 +164,7 @@ if not AIRFLOW_V_3_0_PLUS:
 
     @provide_session
     def _clear_task_instances(
-        dag_id: str, run_id: str, task_ids: list[str], log: Logger, session: 
Session = NEW_SESSION
+        dag_id: str, run_id: str, task_ids: list[str], log: Logger, *, 
session: Session = NEW_SESSION
     ) -> None:
         dag = _get_dag(dag_id, session=session)
         log.debug("task_ids %s to clear", str(task_ids))
@@ -173,7 +173,7 @@ if not AIRFLOW_V_3_0_PLUS:
         clear_task_instances(tis_to_clear, session)
 
     @provide_session
-    def get_task_instance(operator: BaseOperator, dttm, session: Session = 
NEW_SESSION) -> TaskInstance:
+    def get_task_instance(operator: BaseOperator, dttm, *, session: Session = 
NEW_SESSION) -> TaskInstance:
         if select is None:
             raise AirflowOptionalProviderFeatureException(
                 "sqlalchemy is required to get task instance. "
diff --git 
a/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py
 
b/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py
index e021724c2bd..0a6e6c61716 100644
--- 
a/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py
+++ 
b/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py
@@ -168,7 +168,7 @@ def test_get_task_instance_airflow2():
         with patch(
             
"airflow.providers.databricks.plugins.databricks_workflow.DagRun.find", 
return_value=[dag_run]
         ):
-            result = get_task_instance(operator, dttm, session)
+            result = get_task_instance(operator, dttm, session=session)
             assert result == dag_run
 
 
diff --git 
a/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py
 
b/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py
index be26ed6f557..2a4648a7603 100644
--- 
a/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py
+++ 
b/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py
@@ -34,7 +34,7 @@ log = logging.getLogger(__name__)
 
 
 @provide_session
-def cleanup_dag_permissions(dag_id: str, session: Session = NEW_SESSION) -> 
None:
+def cleanup_dag_permissions(dag_id: str, *, session: Session = NEW_SESSION) -> 
None:
     """
     Clean up DAG-specific permissions from Flask-AppBuilder tables.
 
@@ -190,7 +190,7 @@ def permissions_cleanup(args):
             cleanup_count = 0
             for dag_id in orphaned_dag_ids:
                 try:
-                    cleanup_dag_permissions(dag_id, session)
+                    cleanup_dag_permissions(dag_id, session=session)
                     cleanup_count += 1
                     if args.verbose:
                         print(f"Cleaned up permissions for DAG: {dag_id}")
diff --git 
a/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_permissions_command.py
 
b/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_permissions_command.py
index 0d570a1e321..3d6c1538d7e 100644
--- 
a/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_permissions_command.py
+++ 
b/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_permissions_command.py
@@ -96,7 +96,7 @@ class TestPermissionsCommand:
             permissions_command.permissions_cleanup(args)
 
         # Verify function calls - it should be called exactly once for the 
orphaned DAG
-        mock_cleanup_dag_permissions.assert_called_once_with("orphaned_dag", 
mock_session)
+        mock_cleanup_dag_permissions.assert_called_once_with("orphaned_dag", 
session=mock_session)
 
     
@patch("airflow.providers.fab.auth_manager.cli_commands.permissions_command.cleanup_dag_permissions")
     @patch("airflow.providers.fab.auth_manager.models.Resource")
@@ -180,7 +180,7 @@ class TestPermissionsCommand:
             permissions_command.permissions_cleanup(args)
 
         # Should call cleanup_dag_permissions specifically for test_dag
-        mock_cleanup_dag_permissions.assert_called_once_with("test_dag", 
mock_session)
+        mock_cleanup_dag_permissions.assert_called_once_with("test_dag", 
session=mock_session)
 
     
@patch("airflow.providers.fab.auth_manager.cli_commands.permissions_command.cleanup_dag_permissions")
     @patch("airflow.providers.fab.auth_manager.models.Resource")
@@ -280,7 +280,7 @@ class TestDagPermissions:
             session.commit()
 
             # Execute cleanup
-            cleanup_dag_permissions("target_dag", session)
+            cleanup_dag_permissions("target_dag", session=session)
 
             # Verify: target resource deleted, keep resource remains
             assert not session.get(Resource, target_resource.id)
@@ -300,7 +300,7 @@ class TestDagPermissions:
 
         with create_session() as session:
             initial_count = session.scalar(select(func.count(Resource.id)))
-            cleanup_dag_permissions("non_existent_dag", session)
+            cleanup_dag_permissions("non_existent_dag", session=session)
             assert session.scalar(select(func.count(Resource.id))) == 
initial_count
 
     def 
test_cleanup_dag_permissions_handles_resources_without_permissions(self):
@@ -319,7 +319,7 @@ class TestDagPermissions:
             session.commit()
             resource_id = resource.id
 
-            cleanup_dag_permissions("test_dag", session)
+            cleanup_dag_permissions("test_dag", session=session)
             assert not session.get(Resource, resource_id)
 
     def test_cleanup_dag_permissions_with_default_session(self):
diff --git 
a/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py
index f0f47a2c0cf..659cc9f8bf3 100644
--- a/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py
@@ -119,7 +119,7 @@ class BigQueryInsertJobTrigger(BaseTrigger):
     if not AIRFLOW_V_3_3_PLUS:
 
         @provide_session
-        def get_task_instance(self, session: Session) -> TaskInstance:
+        def get_task_instance(self, *, session: Session) -> TaskInstance:
             task_instance = session.scalar(
                 select(TaskInstance).where(
                     TaskInstance.dag_id == self.task_instance.dag_id,
diff --git 
a/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py 
b/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py
index 550fc7df975..047f04e34db 100644
--- a/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py
@@ -135,7 +135,7 @@ class DataprocSubmitTrigger(DataprocBaseTrigger):
     if not AIRFLOW_V_3_3_PLUS:
 
         @provide_session
-        def get_task_instance(self, session: Session) -> TaskInstance:
+        def get_task_instance(self, *, session: Session) -> TaskInstance:
             """
             Get the task instance for the current task.
 
@@ -287,7 +287,7 @@ class DataprocSubmitJobDirectTrigger(DataprocBaseTrigger):
     if not AIRFLOW_V_3_3_PLUS:
 
         @provide_session
-        def get_task_instance(self, session: Session) -> TaskInstance:
+        def get_task_instance(self, *, session: Session) -> TaskInstance:
             """
             Get the task instance for the current task.
 
@@ -431,7 +431,7 @@ class DataprocClusterTrigger(DataprocBaseTrigger):
     if not AIRFLOW_V_3_0_PLUS:
 
         @provide_session
-        def get_task_instance(self, session: Session) -> TaskInstance:
+        def get_task_instance(self, *, session: Session) -> TaskInstance:
             task_instance = session.scalar(
                 select(TaskInstance).where(
                     TaskInstance.dag_id == self.task_instance.dag_id,
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index de661f51f07..afc33804cd7 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -735,7 +735,7 @@ def is_selective_lineage_enabled(obj: DAG | SerializedDAG | 
AnyOperator) -> bool
 if not AIRFLOW_V_3_0_PLUS:
 
     @provide_session
-    def is_ti_rescheduled_already(ti: TaskInstance, session=NEW_SESSION):
+    def is_ti_rescheduled_already(ti: TaskInstance, *, session=NEW_SESSION):
         try:
             from sqlalchemy import exists, select
         except ImportError:
diff --git 
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py 
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
index 4aca1bfc4c1..ff271b3c513 100644
--- 
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
+++ 
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
@@ -464,7 +464,7 @@ class TriggerDagRunOperator(BaseOperator):
 
         @provide_session
         def _trigger_dag_run_af_2_execute_complete(
-            self, event_data: dict[str, Any], session: Session = NEW_SESSION
+            self, event_data: dict[str, Any], *, session: Session = NEW_SESSION
         ):
             # This logical_date is parsed from the return trigger event
             provided_logical_date = event_data["execution_dates"][0]
diff --git 
a/providers/standard/src/airflow/providers/standard/sensors/external_task.py 
b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
index 1a4684b6c4b..cfc4c0c3faf 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
@@ -414,7 +414,9 @@ class ExternalTaskSensor(BaseSensorOperator):
     if not AIRFLOW_V_3_0_PLUS:
 
         @provide_session
-        def _poke_af2(self, dttm_filter: Sequence[datetime.datetime], session: 
Session = NEW_SESSION) -> bool:
+        def _poke_af2(
+            self, dttm_filter: Sequence[datetime.datetime], *, session: 
Session = NEW_SESSION
+        ) -> bool:
             if self.check_existence and not self._has_checked_existence:
                 self._check_for_existence(session=session)
 
@@ -562,7 +564,7 @@ class ExternalTaskSensor(BaseSensorOperator):
             self.external_task_group_id,
             self.external_dag_id,
             states,
-            session,
+            session=session,
         )
 
     def get_external_task_group_task_ids(
diff --git 
a/providers/standard/src/airflow/providers/standard/utils/sensor_helper.py 
b/providers/standard/src/airflow/providers/standard/utils/sensor_helper.py
index 20284b14ef2..f16e86bf34a 100644
--- a/providers/standard/src/airflow/providers/standard/utils/sensor_helper.py
+++ b/providers/standard/src/airflow/providers/standard/utils/sensor_helper.py
@@ -38,6 +38,7 @@ def _get_count(
     external_task_group_id,
     external_dag_id,
     states,
+    *,
     session: Session = NEW_SESSION,
 ) -> int:
     """
diff --git 
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py 
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
index fde38e4e236..589bd32920e 100644
--- 
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
+++ 
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
@@ -54,7 +54,7 @@ from airflow.providers.standard.sensors.external_task import 
ExternalTaskMarker,
 from airflow.providers.standard.sensors.time import TimeSensor
 from airflow.providers.standard.triggers.external_task import WorkflowTrigger
 from airflow.timetables.base import DataInterval
-from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.session import NEW_SESSION, create_session, provide_session
 from airflow.utils.state import DagRunState, State
 from airflow.utils.types import DagRunType
 
@@ -1821,6 +1821,7 @@ def dag_bag_parent_child():
 @provide_session
 def run_tasks(
     dag_bag: DagBag,
+    *,
     logical_date=DEFAULT_DATE,
     session=NEW_SESSION,
 ) -> tuple[dict[str, DagRun], dict[str, TaskInstance]]:
@@ -1893,10 +1894,11 @@ def clear_tasks(
     dag_bag,
     dag,
     task,
-    session,
+    *,
     start_date=DEFAULT_DATE,
     end_date=DEFAULT_DATE,
     dry_run=False,
+    session=NEW_SESSION,
 ):
     """
     Clear the task and its downstream tasks recursively for the dag in the 
given dagbag.
@@ -1927,8 +1929,7 @@ def test_external_task_marker_transitive(dag_bag_ext):
 
 
 @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Different test for 3.0+")
-@provide_session
-def test_external_task_marker_clear_activate(dag_bag_parent_child, session):
+def test_external_task_marker_clear_activate(dag_bag_parent_child):
     """
     Test clearing tasks across DAGs and make sure the right DagRuns are 
activated.
     """
@@ -1939,22 +1940,23 @@ def 
test_external_task_marker_clear_activate(dag_bag_parent_child, session):
     run_tasks(dag_bag, logical_date=day_1)
     run_tasks(dag_bag, logical_date=day_2)
 
-    # Assert that dagruns of all the affected dags are set to SUCCESS before 
tasks are cleared.
-    for dag, execution_date in itertools.product(dag_bag.dags.values(), 
[day_1, day_2]):
-        dagrun = dag.get_dagrun(execution_date=execution_date, session=session)
-        dagrun.set_state(State.SUCCESS)
-    session.flush()
-
-    dag_0 = dag_bag.get_dag("parent_dag_0")
-    task_0 = dag_0.get_task("task_0")
-    clear_tasks(dag_bag, dag_0, task_0, start_date=day_1, end_date=day_2, 
session=session)
-
-    # Assert that dagruns of all the affected dags are set to QUEUED after 
tasks are cleared.
-    # Unaffected dagruns should be left as SUCCESS.
-    dagrun_0_1 = 
dag_bag.get_dag("parent_dag_0").get_dagrun(execution_date=day_1, 
session=session)
-    dagrun_0_2 = 
dag_bag.get_dag("parent_dag_0").get_dagrun(execution_date=day_2, 
session=session)
-    dagrun_1_1 = 
dag_bag.get_dag("child_dag_1").get_dagrun(execution_date=day_1, session=session)
-    dagrun_1_2 = 
dag_bag.get_dag("child_dag_1").get_dagrun(execution_date=day_2, session=session)
+    with create_session() as session:
+        # Assert that dagruns of all the affected dags are set to SUCCESS 
before tasks are cleared.
+        for dag, execution_date in itertools.product(dag_bag.dags.values(), 
[day_1, day_2]):
+            dagrun = dag.get_dagrun(execution_date=execution_date, 
session=session)
+            dagrun.set_state(State.SUCCESS)
+        session.flush()
+
+        dag_0 = dag_bag.get_dag("parent_dag_0")
+        task_0 = dag_0.get_task("task_0")
+        clear_tasks(dag_bag, dag_0, task_0, start_date=day_1, end_date=day_2, 
session=session)
+
+        # Assert that dagruns of all the affected dags are set to QUEUED after 
tasks are cleared.
+        # Unaffected dagruns should be left as SUCCESS.
+        dagrun_0_1 = 
dag_bag.get_dag("parent_dag_0").get_dagrun(execution_date=day_1, 
session=session)
+        dagrun_0_2 = 
dag_bag.get_dag("parent_dag_0").get_dagrun(execution_date=day_2, 
session=session)
+        dagrun_1_1 = 
dag_bag.get_dag("child_dag_1").get_dagrun(execution_date=day_1, session=session)
+        dagrun_1_2 = 
dag_bag.get_dag("child_dag_1").get_dagrun(execution_date=day_2, session=session)
 
     assert dagrun_0_1.state == State.QUEUED
     assert dagrun_0_2.state == State.QUEUED
diff --git a/scripts/ci/prek/known_provide_session_positional.txt 
b/scripts/ci/prek/known_provide_session_positional.txt
index 5826418eb11..1a773d36c0d 100644
--- a/scripts/ci/prek/known_provide_session_positional.txt
+++ b/scripts/ci/prek/known_provide_session_positional.txt
@@ -69,18 +69,4 @@ airflow-core/tests/unit/jobs/test_scheduler_job.py::1
 airflow-core/tests/unit/listeners/test_listeners.py::7
 airflow-core/tests/unit/models/test_taskinstance.py::4
 airflow-core/tests/unit/models/test_timestamp.py::2
-providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py::1
-providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py::1
-providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py::1
-providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py::1
-providers/common/ai/tests/unit/common/ai/plugins/test_hitl_review.py::1
-providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py::2
 providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py::1
-providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py::1
-providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py::1
-providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py::3
-providers/openlineage/src/airflow/providers/openlineage/utils/utils.py::1
-providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py::1
-providers/standard/src/airflow/providers/standard/sensors/external_task.py::1
-providers/standard/src/airflow/providers/standard/utils/sensor_helper.py::1
-providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py::3

Reply via email to