This is an automated email from the ASF dual-hosted git repository.
ashb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1dc71d8b9b5 fix: fix missing rename in test_spark_submit (#68565)
1dc71d8b9b5 is described below
commit 1dc71d8b9b56829ad2e1c84e7559eb0f8c5e7617
Author: Wei Lee <[email protected]>
AuthorDate: Mon Jun 15 18:46:28 2026 +0800
fix: fix missing rename in test_spark_submit (#68565)
---
.../tests/unit/apache/spark/operators/test_spark_submit.py | 14 +++++++-------
task-sdk/src/airflow/sdk/execution_time/context.py | 2 +-
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git
a/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
b/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
index fa41b0af446..9708aab1a02 100644
---
a/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
+++
b/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
@@ -954,7 +954,7 @@ class TestSparkSubmitOperatorK8sTracking:
def test_k8s_get_job_status_returns_k8s_driver_status(self):
operator = self._make_operator(track_driver_via_k8s_api=True)
operator._hook = self._make_k8s_hook()
- task_store = FakeTaskState({"k8s_driver_status": "Succeeded"})
+ task_store = FakeTaskStateStore({"k8s_driver_status": "Succeeded"})
with
mock.patch("airflow.providers.apache.spark.operators.spark_submit.kube_client")
as mock_kube:
result = operator.get_job_status("mynamespace:spark-abc-driver",
{"task_state_store": task_store})
@@ -965,7 +965,7 @@ class TestSparkSubmitOperatorK8sTracking:
def
test_k8s_get_job_status_queries_k8s_api_when_no_k8s_driver_status(self):
operator = self._make_operator(track_driver_via_k8s_api=True)
operator._hook = self._make_k8s_hook()
- task_store = FakeTaskState()
+ task_store = FakeTaskStateStore()
mock_pod = MagicMock()
mock_pod.status.phase = "Running"
@@ -1033,7 +1033,7 @@ class TestSparkSubmitOperatorK8sTracking:
hook = self._make_k8s_hook()
hook._poll_k8s_driver_via_api.return_value = "Succeeded"
operator._hook = hook
- task_store = FakeTaskState()
+ task_store = FakeTaskStateStore()
operator.poll_until_complete("mynamespace:spark-abc-driver",
{"task_state_store": task_store})
@@ -1044,7 +1044,7 @@ class TestSparkSubmitOperatorK8sTracking:
hook = self._make_k8s_hook()
hook._poll_k8s_driver_via_api.return_value = "Succeeded"
operator._hook = hook
- task_store = FakeTaskState()
+ task_store = FakeTaskStateStore()
operator.poll_until_complete("mynamespace:spark-abc-driver",
{"task_state_store": task_store})
@@ -1055,7 +1055,7 @@ class TestSparkSubmitOperatorK8sTracking:
hook = self._make_k8s_hook()
hook._poll_k8s_driver_via_api.side_effect = RuntimeError("Spark
application failed (phase=Failed)")
operator._hook = hook
- task_store = FakeTaskState()
+ task_store = FakeTaskStateStore()
with pytest.raises(RuntimeError, match="phase=Failed"):
operator.poll_until_complete("mynamespace:spark-abc-driver",
{"task_state_store": task_store})
@@ -1079,7 +1079,7 @@ class TestSparkSubmitOperatorK8sTracking:
hook._kubernetes_driver_pod = "spark-abc-driver"
hook._connection = {"namespace": "mynamespace"}
operator._hook = hook
- task_store = FakeTaskState()
+ task_store = FakeTaskStateStore()
persisted_before_poll: list[str | None] = []
def track_poll(external_id, context):
@@ -1102,7 +1102,7 @@ class TestSparkSubmitOperatorK8sTracking:
hook._kubernetes_driver_pod = "spark-abc-driver"
hook._connection = {"namespace": "mynamespace"}
operator._hook = hook
- task_store = FakeTaskState()
+ task_store = FakeTaskStateStore()
operator.poll_until_complete = lambda external_id, context: None
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py
b/task-sdk/src/airflow/sdk/execution_time/context.py
index c097772e804..92dfa47dd5c 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -742,7 +742,7 @@ class AssetStateStoreAccessor:
ref = backend.serialize_asset_state_store_to_ref(value=value,
key=key, scope=scope)
stored = _wrap_external_ref(ref)
- if ((limit := conf.getint("state_store", "max_value_storage_bytes")) >
0):
+ if (limit := conf.getint("state_store", "max_value_storage_bytes")) >
0:
serialized_size = len(json.dumps(stored))
if serialized_size > limit:
log.warning(