This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b66bf510ab2 Remove further findings from positional session check
(#67712)
b66bf510ab2 is described below
commit b66bf510ab2f1e7f6e407ec75233acae81c48d0c
Author: Jens Scheffler <[email protected]>
AuthorDate: Fri May 29 21:12:02 2026 +0200
Remove further findings from positional session check (#67712)
* Remove further findings from positional session check
* Fix pytests
---
.../airflow/providers/amazon/aws/triggers/emr.py | 2 +-
.../kubernetes/executors/kubernetes_executor.py | 1 +
.../cncf/kubernetes/template_rendering.py | 3 +-
.../providers/cncf/kubernetes/triggers/pod.py | 2 +-
.../unit/common/ai/plugins/test_hitl_review.py | 12 +++----
.../databricks/plugins/databricks_workflow.py | 4 +--
.../databricks/plugins/test_databricks_workflow.py | 2 +-
.../cli_commands/permissions_command.py | 4 +--
.../cli_commands/test_permissions_command.py | 10 +++---
.../providers/google/cloud/triggers/bigquery.py | 2 +-
.../providers/google/cloud/triggers/dataproc.py | 6 ++--
.../airflow/providers/openlineage/utils/utils.py | 2 +-
.../providers/standard/operators/trigger_dagrun.py | 2 +-
.../providers/standard/sensors/external_task.py | 6 ++--
.../providers/standard/utils/sensor_helper.py | 1 +
.../standard/sensors/test_external_task_sensor.py | 42 +++++++++++-----------
.../ci/prek/known_provide_session_positional.txt | 14 --------
17 files changed, 54 insertions(+), 61 deletions(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py
index 3c4614be857..c98641b0c6b 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py
@@ -419,7 +419,7 @@ class EmrServerlessStartJobTrigger(AwsBaseWaiterTrigger):
if not AIRFLOW_V_3_0_PLUS:
@provide_session
- def get_task_instance(self, session: Session) -> TaskInstance:
+ def get_task_instance(self, *, session: Session) -> TaskInstance:
"""Get the task instance for the current trigger (Airflow 2.x
compatibility)."""
from sqlalchemy import select
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index d109ac097dd..481960b71d2 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -402,6 +402,7 @@ class KubernetesExecutor(BaseExecutor):
def _change_state(
self,
results: KubernetesResults,
+ *,
session: Session = NEW_SESSION,
) -> None:
"""Change state of the task based on KubernetesResults."""
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
index ed9a265728c..6920e8400f2 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
@@ -85,8 +85,9 @@ def render_k8s_pod_yaml(task_instance: TaskInstance) -> dict
| None:
return sanitized_pod
+# Note: AIRFLOW_V_3_0_PLUS - this is only needed for Airflow 2.x, can be
removed if the support is dropped
@provide_session
-def get_rendered_k8s_spec(task_instance: TaskInstance, session=NEW_SESSION) ->
dict | None:
+def get_rendered_k8s_spec(task_instance: TaskInstance, *, session=NEW_SESSION)
-> dict | None:
"""Fetch rendered template fields from DB."""
from airflow.models.renderedtifields import RenderedTaskInstanceFields
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index 65b1b45bb04..207e97280a5 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -366,7 +366,7 @@ class KubernetesPodTrigger(BaseTrigger):
if not AIRFLOW_V_3_0_PLUS:
@provide_session
- def get_task_instance(self, session: Session) -> TaskInstance:
+ def get_task_instance(self, *, session: Session) -> TaskInstance:
"""Get the task instance for this trigger from the database
(Airflow 2.x only)."""
task_instance = session.scalar(
select(TaskInstance).where(
diff --git
a/providers/common/ai/tests/unit/common/ai/plugins/test_hitl_review.py
b/providers/common/ai/tests/unit/common/ai/plugins/test_hitl_review.py
index 1d5ba3c83c1..1f10af77835 100644
--- a/providers/common/ai/tests/unit/common/ai/plugins/test_hitl_review.py
+++ b/providers/common/ai/tests/unit/common/ai/plugins/test_hitl_review.py
@@ -88,7 +88,6 @@ def _clear_db():
@provide_session
def _create_hitl_session(
- session=None,
*,
dag_id=TEST_DAG_ID,
run_id=TEST_RUN_ID,
@@ -98,6 +97,7 @@ def _create_hitl_session(
iteration=1,
prompt="Summarize",
current_output="Initial output",
+ session=None,
):
"""Create HITL session and output XCom entries in the database."""
sess = AgentSessionData(
@@ -730,11 +730,11 @@ class TestBuildSessionResponse:
dag_maker.create_dagrun(run_id=TEST_RUN_ID,
run_type=DagRunType.MANUAL, logical_date=logical_date)
dag_maker.sync_dagbag_to_db()
_create_hitl_session(
- session=session,
status=status,
iteration=iteration,
prompt=prompt,
current_output=current_output,
+ session=session,
)
session.commit()
@@ -757,11 +757,11 @@ class TestBuildSessionResponse:
dag_maker.create_dagrun(run_id=TEST_RUN_ID,
run_type=DagRunType.MANUAL, logical_date=logical_date)
dag_maker.sync_dagbag_to_db()
_create_hitl_session(
- session=session,
status=SessionStatus.PENDING_REVIEW,
iteration=1,
prompt="Summarize",
current_output="First output",
+ session=session,
)
session.commit()
@@ -785,11 +785,11 @@ class TestBuildSessionResponse:
dag_maker.create_dagrun(run_id=TEST_RUN_ID,
run_type=DagRunType.MANUAL, logical_date=logical_date)
dag_maker.sync_dagbag_to_db()
_create_hitl_session(
- session=session,
status=SessionStatus.PENDING_REVIEW,
iteration=1,
prompt="p",
current_output="Initial",
+ session=session,
)
XComModel.set(
key=f"{XCOM_HUMAN_FEEDBACK_PREFIX}1",
@@ -986,11 +986,11 @@ class TestFindSessionEndpoint:
def test_returns_max_iterations_exceeded_status(self, test_client,
session, dag_maker):
"""Find endpoint returns max_iterations_exceeded when session has that
status."""
_create_hitl_session(
- session=session,
status=SessionStatus.MAX_ITERATIONS_EXCEEDED,
iteration=5,
prompt="p",
current_output="output",
+ session=session,
)
session.commit()
@@ -1012,11 +1012,11 @@ class TestFindSessionEndpoint:
def test_returns_timeout_exceeded_status(self, test_client, session,
dag_maker):
"""Find endpoint returns timeout_exceeded when session has that
status."""
_create_hitl_session(
- session=session,
status=SessionStatus.TIMEOUT_EXCEEDED,
iteration=2,
prompt="p",
current_output="output",
+ session=session,
)
session.commit()
diff --git
a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py
b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py
index d50acc800e7..756bbf6bdee 100644
---
a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py
+++
b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py
@@ -164,7 +164,7 @@ if not AIRFLOW_V_3_0_PLUS:
@provide_session
def _clear_task_instances(
- dag_id: str, run_id: str, task_ids: list[str], log: Logger, session:
Session = NEW_SESSION
+ dag_id: str, run_id: str, task_ids: list[str], log: Logger, *,
session: Session = NEW_SESSION
) -> None:
dag = _get_dag(dag_id, session=session)
log.debug("task_ids %s to clear", str(task_ids))
@@ -173,7 +173,7 @@ if not AIRFLOW_V_3_0_PLUS:
clear_task_instances(tis_to_clear, session)
@provide_session
- def get_task_instance(operator: BaseOperator, dttm, session: Session =
NEW_SESSION) -> TaskInstance:
+ def get_task_instance(operator: BaseOperator, dttm, *, session: Session =
NEW_SESSION) -> TaskInstance:
if select is None:
raise AirflowOptionalProviderFeatureException(
"sqlalchemy is required to get task instance. "
diff --git
a/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py
b/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py
index e021724c2bd..0a6e6c61716 100644
---
a/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py
+++
b/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py
@@ -168,7 +168,7 @@ def test_get_task_instance_airflow2():
with patch(
"airflow.providers.databricks.plugins.databricks_workflow.DagRun.find",
return_value=[dag_run]
):
- result = get_task_instance(operator, dttm, session)
+ result = get_task_instance(operator, dttm, session=session)
assert result == dag_run
diff --git
a/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py
b/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py
index be26ed6f557..2a4648a7603 100644
---
a/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py
+++
b/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py
@@ -34,7 +34,7 @@ log = logging.getLogger(__name__)
@provide_session
-def cleanup_dag_permissions(dag_id: str, session: Session = NEW_SESSION) ->
None:
+def cleanup_dag_permissions(dag_id: str, *, session: Session = NEW_SESSION) ->
None:
"""
Clean up DAG-specific permissions from Flask-AppBuilder tables.
@@ -190,7 +190,7 @@ def permissions_cleanup(args):
cleanup_count = 0
for dag_id in orphaned_dag_ids:
try:
- cleanup_dag_permissions(dag_id, session)
+ cleanup_dag_permissions(dag_id, session=session)
cleanup_count += 1
if args.verbose:
print(f"Cleaned up permissions for DAG: {dag_id}")
diff --git
a/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_permissions_command.py
b/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_permissions_command.py
index 0d570a1e321..3d6c1538d7e 100644
---
a/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_permissions_command.py
+++
b/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_permissions_command.py
@@ -96,7 +96,7 @@ class TestPermissionsCommand:
permissions_command.permissions_cleanup(args)
# Verify function calls - it should be called exactly once for the
orphaned DAG
- mock_cleanup_dag_permissions.assert_called_once_with("orphaned_dag",
mock_session)
+ mock_cleanup_dag_permissions.assert_called_once_with("orphaned_dag",
session=mock_session)
@patch("airflow.providers.fab.auth_manager.cli_commands.permissions_command.cleanup_dag_permissions")
@patch("airflow.providers.fab.auth_manager.models.Resource")
@@ -180,7 +180,7 @@ class TestPermissionsCommand:
permissions_command.permissions_cleanup(args)
# Should call cleanup_dag_permissions specifically for test_dag
- mock_cleanup_dag_permissions.assert_called_once_with("test_dag",
mock_session)
+ mock_cleanup_dag_permissions.assert_called_once_with("test_dag",
session=mock_session)
@patch("airflow.providers.fab.auth_manager.cli_commands.permissions_command.cleanup_dag_permissions")
@patch("airflow.providers.fab.auth_manager.models.Resource")
@@ -280,7 +280,7 @@ class TestDagPermissions:
session.commit()
# Execute cleanup
- cleanup_dag_permissions("target_dag", session)
+ cleanup_dag_permissions("target_dag", session=session)
# Verify: target resource deleted, keep resource remains
assert not session.get(Resource, target_resource.id)
@@ -300,7 +300,7 @@ class TestDagPermissions:
with create_session() as session:
initial_count = session.scalar(select(func.count(Resource.id)))
- cleanup_dag_permissions("non_existent_dag", session)
+ cleanup_dag_permissions("non_existent_dag", session=session)
assert session.scalar(select(func.count(Resource.id))) ==
initial_count
def
test_cleanup_dag_permissions_handles_resources_without_permissions(self):
@@ -319,7 +319,7 @@ class TestDagPermissions:
session.commit()
resource_id = resource.id
- cleanup_dag_permissions("test_dag", session)
+ cleanup_dag_permissions("test_dag", session=session)
assert not session.get(Resource, resource_id)
def test_cleanup_dag_permissions_with_default_session(self):
diff --git
a/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py
index f0f47a2c0cf..659cc9f8bf3 100644
--- a/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py
@@ -119,7 +119,7 @@ class BigQueryInsertJobTrigger(BaseTrigger):
if not AIRFLOW_V_3_3_PLUS:
@provide_session
- def get_task_instance(self, session: Session) -> TaskInstance:
+ def get_task_instance(self, *, session: Session) -> TaskInstance:
task_instance = session.scalar(
select(TaskInstance).where(
TaskInstance.dag_id == self.task_instance.dag_id,
diff --git
a/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py
b/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py
index 550fc7df975..047f04e34db 100644
--- a/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py
@@ -135,7 +135,7 @@ class DataprocSubmitTrigger(DataprocBaseTrigger):
if not AIRFLOW_V_3_3_PLUS:
@provide_session
- def get_task_instance(self, session: Session) -> TaskInstance:
+ def get_task_instance(self, *, session: Session) -> TaskInstance:
"""
Get the task instance for the current task.
@@ -287,7 +287,7 @@ class DataprocSubmitJobDirectTrigger(DataprocBaseTrigger):
if not AIRFLOW_V_3_3_PLUS:
@provide_session
- def get_task_instance(self, session: Session) -> TaskInstance:
+ def get_task_instance(self, *, session: Session) -> TaskInstance:
"""
Get the task instance for the current task.
@@ -431,7 +431,7 @@ class DataprocClusterTrigger(DataprocBaseTrigger):
if not AIRFLOW_V_3_0_PLUS:
@provide_session
- def get_task_instance(self, session: Session) -> TaskInstance:
+ def get_task_instance(self, *, session: Session) -> TaskInstance:
task_instance = session.scalar(
select(TaskInstance).where(
TaskInstance.dag_id == self.task_instance.dag_id,
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index de661f51f07..afc33804cd7 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -735,7 +735,7 @@ def is_selective_lineage_enabled(obj: DAG | SerializedDAG |
AnyOperator) -> bool
if not AIRFLOW_V_3_0_PLUS:
@provide_session
- def is_ti_rescheduled_already(ti: TaskInstance, session=NEW_SESSION):
+ def is_ti_rescheduled_already(ti: TaskInstance, *, session=NEW_SESSION):
try:
from sqlalchemy import exists, select
except ImportError:
diff --git
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
index 4aca1bfc4c1..ff271b3c513 100644
---
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
+++
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
@@ -464,7 +464,7 @@ class TriggerDagRunOperator(BaseOperator):
@provide_session
def _trigger_dag_run_af_2_execute_complete(
- self, event_data: dict[str, Any], session: Session = NEW_SESSION
+ self, event_data: dict[str, Any], *, session: Session = NEW_SESSION
):
# This logical_date is parsed from the return trigger event
provided_logical_date = event_data["execution_dates"][0]
diff --git
a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
index 1a4684b6c4b..cfc4c0c3faf 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
@@ -414,7 +414,9 @@ class ExternalTaskSensor(BaseSensorOperator):
if not AIRFLOW_V_3_0_PLUS:
@provide_session
- def _poke_af2(self, dttm_filter: Sequence[datetime.datetime], session:
Session = NEW_SESSION) -> bool:
+ def _poke_af2(
+ self, dttm_filter: Sequence[datetime.datetime], *, session:
Session = NEW_SESSION
+ ) -> bool:
if self.check_existence and not self._has_checked_existence:
self._check_for_existence(session=session)
@@ -562,7 +564,7 @@ class ExternalTaskSensor(BaseSensorOperator):
self.external_task_group_id,
self.external_dag_id,
states,
- session,
+ session=session,
)
def get_external_task_group_task_ids(
diff --git
a/providers/standard/src/airflow/providers/standard/utils/sensor_helper.py
b/providers/standard/src/airflow/providers/standard/utils/sensor_helper.py
index 20284b14ef2..f16e86bf34a 100644
--- a/providers/standard/src/airflow/providers/standard/utils/sensor_helper.py
+++ b/providers/standard/src/airflow/providers/standard/utils/sensor_helper.py
@@ -38,6 +38,7 @@ def _get_count(
external_task_group_id,
external_dag_id,
states,
+ *,
session: Session = NEW_SESSION,
) -> int:
"""
diff --git
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
index fde38e4e236..589bd32920e 100644
---
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
+++
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
@@ -54,7 +54,7 @@ from airflow.providers.standard.sensors.external_task import
ExternalTaskMarker,
from airflow.providers.standard.sensors.time import TimeSensor
from airflow.providers.standard.triggers.external_task import WorkflowTrigger
from airflow.timetables.base import DataInterval
-from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import DagRunState, State
from airflow.utils.types import DagRunType
@@ -1821,6 +1821,7 @@ def dag_bag_parent_child():
@provide_session
def run_tasks(
dag_bag: DagBag,
+ *,
logical_date=DEFAULT_DATE,
session=NEW_SESSION,
) -> tuple[dict[str, DagRun], dict[str, TaskInstance]]:
@@ -1893,10 +1894,11 @@ def clear_tasks(
dag_bag,
dag,
task,
- session,
+ *,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE,
dry_run=False,
+ session=NEW_SESSION,
):
"""
Clear the task and its downstream tasks recursively for the dag in the
given dagbag.
@@ -1927,8 +1929,7 @@ def test_external_task_marker_transitive(dag_bag_ext):
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Different test for 3.0+")
-@provide_session
-def test_external_task_marker_clear_activate(dag_bag_parent_child, session):
+def test_external_task_marker_clear_activate(dag_bag_parent_child):
"""
Test clearing tasks across DAGs and make sure the right DagRuns are
activated.
"""
@@ -1939,22 +1940,23 @@ def
test_external_task_marker_clear_activate(dag_bag_parent_child, session):
run_tasks(dag_bag, logical_date=day_1)
run_tasks(dag_bag, logical_date=day_2)
- # Assert that dagruns of all the affected dags are set to SUCCESS before
tasks are cleared.
- for dag, execution_date in itertools.product(dag_bag.dags.values(),
[day_1, day_2]):
- dagrun = dag.get_dagrun(execution_date=execution_date, session=session)
- dagrun.set_state(State.SUCCESS)
- session.flush()
-
- dag_0 = dag_bag.get_dag("parent_dag_0")
- task_0 = dag_0.get_task("task_0")
- clear_tasks(dag_bag, dag_0, task_0, start_date=day_1, end_date=day_2,
session=session)
-
- # Assert that dagruns of all the affected dags are set to QUEUED after
tasks are cleared.
- # Unaffected dagruns should be left as SUCCESS.
- dagrun_0_1 =
dag_bag.get_dag("parent_dag_0").get_dagrun(execution_date=day_1,
session=session)
- dagrun_0_2 =
dag_bag.get_dag("parent_dag_0").get_dagrun(execution_date=day_2,
session=session)
- dagrun_1_1 =
dag_bag.get_dag("child_dag_1").get_dagrun(execution_date=day_1, session=session)
- dagrun_1_2 =
dag_bag.get_dag("child_dag_1").get_dagrun(execution_date=day_2, session=session)
+ with create_session() as session:
+ # Assert that dagruns of all the affected dags are set to SUCCESS
before tasks are cleared.
+ for dag, execution_date in itertools.product(dag_bag.dags.values(),
[day_1, day_2]):
+ dagrun = dag.get_dagrun(execution_date=execution_date,
session=session)
+ dagrun.set_state(State.SUCCESS)
+ session.flush()
+
+ dag_0 = dag_bag.get_dag("parent_dag_0")
+ task_0 = dag_0.get_task("task_0")
+ clear_tasks(dag_bag, dag_0, task_0, start_date=day_1, end_date=day_2,
session=session)
+
+ # Assert that dagruns of all the affected dags are set to QUEUED after
tasks are cleared.
+ # Unaffected dagruns should be left as SUCCESS.
+ dagrun_0_1 =
dag_bag.get_dag("parent_dag_0").get_dagrun(execution_date=day_1,
session=session)
+ dagrun_0_2 =
dag_bag.get_dag("parent_dag_0").get_dagrun(execution_date=day_2,
session=session)
+ dagrun_1_1 =
dag_bag.get_dag("child_dag_1").get_dagrun(execution_date=day_1, session=session)
+ dagrun_1_2 =
dag_bag.get_dag("child_dag_1").get_dagrun(execution_date=day_2, session=session)
assert dagrun_0_1.state == State.QUEUED
assert dagrun_0_2.state == State.QUEUED
diff --git a/scripts/ci/prek/known_provide_session_positional.txt
b/scripts/ci/prek/known_provide_session_positional.txt
index 5826418eb11..1a773d36c0d 100644
--- a/scripts/ci/prek/known_provide_session_positional.txt
+++ b/scripts/ci/prek/known_provide_session_positional.txt
@@ -69,18 +69,4 @@ airflow-core/tests/unit/jobs/test_scheduler_job.py::1
airflow-core/tests/unit/listeners/test_listeners.py::7
airflow-core/tests/unit/models/test_taskinstance.py::4
airflow-core/tests/unit/models/test_timestamp.py::2
-providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py::1
-providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py::1
-providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py::1
-providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py::1
-providers/common/ai/tests/unit/common/ai/plugins/test_hitl_review.py::1
-providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py::2
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py::1
-providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py::1
-providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py::1
-providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py::3
-providers/openlineage/src/airflow/providers/openlineage/utils/utils.py::1
-providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py::1
-providers/standard/src/airflow/providers/standard/sensors/external_task.py::1
-providers/standard/src/airflow/providers/standard/utils/sensor_helper.py::1
-providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py::3