This is an automated email from the ASF dual-hosted git repository.

amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3baec70f90f Renaming old name: task_state to task_store across spark 
provider (#68430)
3baec70f90f is described below

commit 3baec70f90fa8de146b11f0bdaa3b6eac922cbe8
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Jun 15 12:03:31 2026 +0530

    Renaming old name: task_state to task_store across spark provider (#68430)
---
 providers/apache/spark/docs/operators.rst              |  4 ++--
 .../providers/apache/spark/operators/spark_submit.py   |  6 +++---
 .../unit/apache/spark/operators/test_spark_submit.py   | 18 +++++++++---------
 3 files changed, 14 insertions(+), 14 deletions(-)

diff --git a/providers/apache/spark/docs/operators.rst 
b/providers/apache/spark/docs/operators.rst
index 6bdd4bbcdc7..d4520578420 100644
--- a/providers/apache/spark/docs/operators.rst
+++ b/providers/apache/spark/docs/operators.rst
@@ -190,7 +190,7 @@ independently on the cluster. If the Airflow worker dies 
while the Spark job is
 Airflow loses track of it and the behaviour to submit a brand new job would be 
wasting
 the compute already done or even cause conflicts if the Spark job itself is 
not designed to be idempotent.
 
-Now, the ``SparkSubmitOperator`` solves this by persisting the driver ID to 
``task_state`` immediately after
+Now, the ``SparkSubmitOperator`` solves this by persisting the driver ID to 
``task_state_store`` immediately after
 submission. On retry, it reads the ID back and reconnects to the 
already-running driver instead of
 resubmitting.
 
@@ -212,7 +212,7 @@ The reconnection polling calls the Spark standalone REST API
 See :doc:`connections/spark-submit` for how to configure these fields.
 
 .. note::
-    Crash recovery in cluster mode requires Airflow 3.3+ (``task_state`` 
support). On earlier
+    Crash recovery in cluster mode requires Airflow 3.3+ (``task_state_store`` 
support). On earlier
     versions the operator falls back to the previous behavior of always 
submitting fresh.
 
 Tracking driver status via Kubernetes API
diff --git 
a/providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
 
b/providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
index ac9b550409f..e2b6067d42c 100644
--- 
a/providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
+++ 
b/providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
@@ -37,7 +37,7 @@ except ImportError:
     # ResumableJobMixin does not exist in Airflow 2, so we need to add a stub 
to make it
     # behave as before
     class ResumableJobMixin:  # type: ignore[no-redef]
-        """Airflow 2 stub — no task_state, always submits fresh."""
+        """Airflow 2 stub — no task_state_store, always submits fresh."""
 
         external_id_key: str = "remote_job_id"
 
@@ -264,7 +264,7 @@ class SparkSubmitOperator(ResumableJobMixin, BaseOperator):
         if hook._should_track_driver_status:
             if self.reconnect_on_retry:
                 return self.execute_resumable(context)
-            # reconnect_on_retry=False: still submit-and-poll, just skip 
task_state persistence.
+            # reconnect_on_retry=False: still submit-and-poll, just skip 
task_state_store persistence.
             driver_id = self.submit_job(context)
             self.poll_until_complete(driver_id, context)
             return self.get_job_result(driver_id, context)
@@ -284,7 +284,7 @@ class SparkSubmitOperator(ResumableJobMixin, BaseOperator):
                 hook._validate_yarn_track_via_rm_api_config()
                 if self.reconnect_on_retry:
                     return self.execute_resumable(context)
-                # reconnect_on_retry=False: still submit-and-poll, just skip 
task_state persistence.
+                # reconnect_on_retry=False: still submit-and-poll, just skip 
task_state_store persistence.
                 driver_id = self.submit_job(context)
                 self.poll_until_complete(driver_id, context)
                 return self.get_job_result(driver_id, context)
diff --git 
a/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py 
b/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
index a1fe95d6003..9aa2c206406 100644
--- 
a/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
+++ 
b/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
@@ -480,8 +480,8 @@ class TestSparkSubmitOperator:
         }
 
 
-class FakeTaskState:
-    """In-memory task state for tests."""
+class FakeTaskStateStore:
+    """In-memory task state store for tests."""
 
     def __init__(self, stored: dict[str, str] | None = None):
         self._store: dict[str, str] = dict(stored or {})
@@ -528,7 +528,7 @@ class TestSparkSubmitOperatorResumable:
         operator._hook = self._make_hook(should_track=True)
         operator._hook.submit.return_value = "driver-001"
 
-        task_store = FakeTaskState()
+        task_store = FakeTaskStateStore()
         persisted_before_poll = []
 
         def track_poll(external_id, context):
@@ -555,7 +555,7 @@ class TestSparkSubmitOperatorResumable:
         operator = self._make_operator()
         operator._hook = self._make_hook(should_track=True)
         operator._hook.submit.return_value = "driver-new"
-        task_store = FakeTaskState({"spark_job_id": "driver-001"})
+        task_store = FakeTaskStateStore({"spark_job_id": "driver-001"})
 
         operator.get_job_status = lambda external_id, context: prior_status
         polled = []
@@ -590,7 +590,7 @@ class TestSparkSubmitOperatorResumable:
         operator = self._make_operator(reconnect_on_retry=False)
         operator._hook = self._make_hook(should_track=True)
         operator._hook.submit.return_value = "driver-new"
-        task_store = FakeTaskState({"spark_job_id": "driver-old"})
+        task_store = FakeTaskStateStore({"spark_job_id": "driver-old"})
         polled = []
         operator.poll_until_complete = lambda external_id, context: 
polled.append(external_id)
 
@@ -733,7 +733,7 @@ class TestSparkSubmitOperatorResumable:
         operator._hook._yarn_application_id = "application_1234_0001"
         operator._hook.submit.return_value = None
 
-        task_store = FakeTaskState()
+        task_store = FakeTaskStateStore()
         persisted_before_poll = []
 
         def track_poll(external_id, context):
@@ -747,7 +747,7 @@ class TestSparkSubmitOperatorResumable:
     def test_yarn_retry_reconnects_to_running_app(self):
         operator = self._make_operator()
         operator._hook = self._make_hook(is_yarn_cluster=True)
-        task_store = FakeTaskState({"spark_job_id": "application_1234_0001"})
+        task_store = FakeTaskStateStore({"spark_job_id": 
"application_1234_0001"})
 
         operator.get_job_status = lambda external_id, context: "RUNNING"
         polled = []
@@ -761,7 +761,7 @@ class TestSparkSubmitOperatorResumable:
     def test_yarn_retry_skips_already_succeeded_app(self):
         operator = self._make_operator()
         operator._hook = self._make_hook(is_yarn_cluster=True)
-        task_store = FakeTaskState({"spark_job_id": "application_1234_0001"})
+        task_store = FakeTaskStateStore({"spark_job_id": 
"application_1234_0001"})
 
         operator.get_job_status = lambda external_id, context: "SUCCEEDED"
 
@@ -775,7 +775,7 @@ class TestSparkSubmitOperatorResumable:
         operator._hook._conf = {}
         operator._hook._yarn_application_id = "application_1234_0002"
         operator._hook.submit.return_value = None
-        task_store = FakeTaskState({"spark_job_id": "application_1234_0001"})
+        task_store = FakeTaskStateStore({"spark_job_id": 
"application_1234_0001"})
 
         operator.get_job_status = lambda external_id, context: "FAILED"
         polled = []

Reply via email to