This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3baec70f90f Renaming old name: task_state to task_store across spark
provider (#68430)
3baec70f90f is described below
commit 3baec70f90fa8de146b11f0bdaa3b6eac922cbe8
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Jun 15 12:03:31 2026 +0530
Renaming old name: task_state to task_store across spark provider (#68430)
---
providers/apache/spark/docs/operators.rst | 4 ++--
.../providers/apache/spark/operators/spark_submit.py | 6 +++---
.../unit/apache/spark/operators/test_spark_submit.py | 18 +++++++++---------
3 files changed, 14 insertions(+), 14 deletions(-)
diff --git a/providers/apache/spark/docs/operators.rst
b/providers/apache/spark/docs/operators.rst
index 6bdd4bbcdc7..d4520578420 100644
--- a/providers/apache/spark/docs/operators.rst
+++ b/providers/apache/spark/docs/operators.rst
@@ -190,7 +190,7 @@ independently on the cluster. If the Airflow worker dies
while the Spark job is
Airflow loses track of it and the behaviour to submit a brand new job would be
wasting
the compute already done or even cause conflicts if the Spark job itself is
not designed to be idempotent.
-Now, the ``SparkSubmitOperator`` solves this by persisting the driver ID to
``task_state`` immediately after
+Now, the ``SparkSubmitOperator`` solves this by persisting the driver ID to
``task_state_store`` immediately after
submission. On retry, it reads the ID back and reconnects to the
already-running driver instead of
resubmitting.
@@ -212,7 +212,7 @@ The reconnection polling calls the Spark standalone REST API
See :doc:`connections/spark-submit` for how to configure these fields.
.. note::
- Crash recovery in cluster mode requires Airflow 3.3+ (``task_state``
support). On earlier
+ Crash recovery in cluster mode requires Airflow 3.3+ (``task_state_store``
support). On earlier
versions the operator falls back to the previous behavior of always
submitting fresh.
Tracking driver status via Kubernetes API
diff --git
a/providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
b/providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
index ac9b550409f..e2b6067d42c 100644
---
a/providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
+++
b/providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
@@ -37,7 +37,7 @@ except ImportError:
# ResumableJobMixin does not exist in Airflow 2, so we need to add a stub
to make it
# behave as before
class ResumableJobMixin: # type: ignore[no-redef]
- """Airflow 2 stub — no task_state, always submits fresh."""
+ """Airflow 2 stub — no task_state_store, always submits fresh."""
external_id_key: str = "remote_job_id"
@@ -264,7 +264,7 @@ class SparkSubmitOperator(ResumableJobMixin, BaseOperator):
if hook._should_track_driver_status:
if self.reconnect_on_retry:
return self.execute_resumable(context)
- # reconnect_on_retry=False: still submit-and-poll, just skip
task_state persistence.
+ # reconnect_on_retry=False: still submit-and-poll, just skip
task_state_store persistence.
driver_id = self.submit_job(context)
self.poll_until_complete(driver_id, context)
return self.get_job_result(driver_id, context)
@@ -284,7 +284,7 @@ class SparkSubmitOperator(ResumableJobMixin, BaseOperator):
hook._validate_yarn_track_via_rm_api_config()
if self.reconnect_on_retry:
return self.execute_resumable(context)
- # reconnect_on_retry=False: still submit-and-poll, just skip
task_state persistence.
+ # reconnect_on_retry=False: still submit-and-poll, just skip
task_state_store persistence.
driver_id = self.submit_job(context)
self.poll_until_complete(driver_id, context)
return self.get_job_result(driver_id, context)
diff --git
a/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
b/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
index a1fe95d6003..9aa2c206406 100644
---
a/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
+++
b/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
@@ -480,8 +480,8 @@ class TestSparkSubmitOperator:
}
-class FakeTaskState:
- """In-memory task state for tests."""
+class FakeTaskStateStore:
+ """In-memory task state store for tests."""
def __init__(self, stored: dict[str, str] | None = None):
self._store: dict[str, str] = dict(stored or {})
@@ -528,7 +528,7 @@ class TestSparkSubmitOperatorResumable:
operator._hook = self._make_hook(should_track=True)
operator._hook.submit.return_value = "driver-001"
- task_store = FakeTaskState()
+ task_store = FakeTaskStateStore()
persisted_before_poll = []
def track_poll(external_id, context):
@@ -555,7 +555,7 @@ class TestSparkSubmitOperatorResumable:
operator = self._make_operator()
operator._hook = self._make_hook(should_track=True)
operator._hook.submit.return_value = "driver-new"
- task_store = FakeTaskState({"spark_job_id": "driver-001"})
+ task_store = FakeTaskStateStore({"spark_job_id": "driver-001"})
operator.get_job_status = lambda external_id, context: prior_status
polled = []
@@ -590,7 +590,7 @@ class TestSparkSubmitOperatorResumable:
operator = self._make_operator(reconnect_on_retry=False)
operator._hook = self._make_hook(should_track=True)
operator._hook.submit.return_value = "driver-new"
- task_store = FakeTaskState({"spark_job_id": "driver-old"})
+ task_store = FakeTaskStateStore({"spark_job_id": "driver-old"})
polled = []
operator.poll_until_complete = lambda external_id, context:
polled.append(external_id)
@@ -733,7 +733,7 @@ class TestSparkSubmitOperatorResumable:
operator._hook._yarn_application_id = "application_1234_0001"
operator._hook.submit.return_value = None
- task_store = FakeTaskState()
+ task_store = FakeTaskStateStore()
persisted_before_poll = []
def track_poll(external_id, context):
@@ -747,7 +747,7 @@ class TestSparkSubmitOperatorResumable:
def test_yarn_retry_reconnects_to_running_app(self):
operator = self._make_operator()
operator._hook = self._make_hook(is_yarn_cluster=True)
- task_store = FakeTaskState({"spark_job_id": "application_1234_0001"})
+ task_store = FakeTaskStateStore({"spark_job_id":
"application_1234_0001"})
operator.get_job_status = lambda external_id, context: "RUNNING"
polled = []
@@ -761,7 +761,7 @@ class TestSparkSubmitOperatorResumable:
def test_yarn_retry_skips_already_succeeded_app(self):
operator = self._make_operator()
operator._hook = self._make_hook(is_yarn_cluster=True)
- task_store = FakeTaskState({"spark_job_id": "application_1234_0001"})
+ task_store = FakeTaskStateStore({"spark_job_id":
"application_1234_0001"})
operator.get_job_status = lambda external_id, context: "SUCCEEDED"
@@ -775,7 +775,7 @@ class TestSparkSubmitOperatorResumable:
operator._hook._conf = {}
operator._hook._yarn_application_id = "application_1234_0002"
operator._hook.submit.return_value = None
- task_store = FakeTaskState({"spark_job_id": "application_1234_0001"})
+ task_store = FakeTaskStateStore({"spark_job_id":
"application_1234_0001"})
operator.get_job_status = lambda external_id, context: "FAILED"
polled = []