amoghrajesh commented on code in PR #67299:
URL: https://github.com/apache/airflow/pull/67299#discussion_r3339416293


##########
airflow-core/docs/administration-and-deployment/state-cleanup.rst:
##########
@@ -0,0 +1,109 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _state-cleanup:
+
+State Cleanup
+=============
+
+.. versionadded:: 3.3
+
+Airflow does not automatically purge task state rows on a schedule. Cleanup 
(also known as "garbage collection") is the responsibility of the user (you) 
and must be triggered explicitly via the CLI. This page explains what gets 
cleaned up, how to run it, and how to integrate it into a recurring maintenance 
workflow.
+
+
+What gets cleaned up
+--------------------
+
+The cleanup command operates only on **task state** rows in the 
``MetastoreStateBackend``. Asset state rows are never touched by this command. 
Asset state rows are removed only by the orphan sweep when an asset is 
deactivated (see :ref:`state-store`).
+
+A task state row is eligible for deletion when its ``expires_at`` timestamp is 
in the past. ``expires_at`` is computed on the worker at write time:

Review Comment:
   ```suggestion
   A task state row is eligible for deletion when its expires, i.e: 
``expires_at`` timestamp is in the past. ``expires_at`` is computed on the 
worker when a task state row is written:
   ```



##########
airflow-core/docs/administration-and-deployment/state-cleanup.rst:
##########
@@ -0,0 +1,109 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _state-cleanup:
+
+State Cleanup
+=============
+
+.. versionadded:: 3.3
+
+Airflow does not automatically purge task state rows on a schedule. Cleanup 
(also known as "garbage collection") is the responsibility of the user (you) 
and must be triggered explicitly via the CLI. This page explains what gets 
cleaned up, how to run it, and how to integrate it into a recurring maintenance 
workflow.
+
+
+What gets cleaned up
+--------------------
+
+The cleanup command operates only on **task state** rows in the 
``MetastoreStateBackend``. Asset state rows are never touched by this command. 
Asset state rows are removed only by the orphan sweep when an asset is 
deactivated (see :ref:`state-store`).

Review Comment:
   ```suggestion
   The cleanup command operates only on **task state** rows in the default 
state backend. Asset state rows are never touched by this command. Asset state 
rows are removed only by the orphan sweep when an asset is deactivated (see 
:ref:`state-store`).
   ```



##########
airflow-core/docs/administration-and-deployment/state-cleanup.rst:
##########
@@ -0,0 +1,109 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _state-cleanup:
+
+State Cleanup
+=============
+
+.. versionadded:: 3.3
+
+Airflow does not automatically purge task state rows on a schedule. Cleanup 
(also known as "garbage collection") is the responsibility of the user (you) 
and must be triggered explicitly via the CLI. This page explains what gets 
cleaned up, how to run it, and how to integrate it into a recurring maintenance 
workflow.
+
+
+What gets cleaned up
+--------------------
+
+The cleanup command operates only on **task state** rows in the 
``MetastoreStateBackend``. Asset state rows are never touched by this command. 
Asset state rows are removed only by the orphan sweep when an asset is 
deactivated (see :ref:`state-store`).
+
+A task state row is eligible for deletion when its ``expires_at`` timestamp is 
in the past. ``expires_at`` is computed on the worker at write time:
+
+* Keys written with an explicit ``retention=timedelta(...)`` expire after that 
duration from the time of the write.
+* Keys written with ``retention=None`` (the default) pick up an expiry based 
on ``[state_store] default_retention_days``. If that value is ``> 0``, the key 
expires that many days after the write.
+* Keys written with ``retention=NEVER_EXPIRE`` have ``expires_at = NULL`` and 
a flag that marks them as permanent. They are **never** deleted by this command 
regardless of configuration.
+
+If ``[state_store] default_retention_days = 0``, keys written without an 
explicit retention have ``expires_at = NULL`` (no expiry) and are also skipped. 
Only keys with a non-null, past ``expires_at`` are removed.
+
+.. note::
+
+   Custom backends (``[state_store] backend`` set to anything other than the 
default) are **explicitly skipped**. The cleanup command prints a message and 
exits cleanly without deleting anything. If your custom backend needs its own 
retention logic, implement it in ``BaseStateBackend.cleanup()`` and call it 
from your own maintenance process.
+
+
+Running cleanup
+---------------
+
+The command is::
+
+    airflow state-store cleanup-task-states
+
+It reads ``[state_store] default_retention_days`` and ``[state_store] 
state_cleanup_batch_size`` from the ``airflow.cfg`` file, then deletes all 
eligible rows.
+
+**Dry run**
+
+Use ``--dry-run`` to preview what would be deleted without removing anything::
+
+    airflow state-store cleanup-task-states --dry-run
+
+The output lists every row that would be deleted, grouped by dag, run, task, 
map index, and key.
+
+**Batching**
+
+By default (``state_cleanup_batch_size = 0``) all eligible rows are deleted in 
a single statement. On deployments with large ``task_state`` tables, set a 
batch size to reduce lock duration per transaction::
+
+    # airflow.cfg
+    [state_store]
+    state_cleanup_batch_size = 10000
+
+The command then deletes rows in batches of 10,000, committing after each 
batch, until no eligible rows remain.
+
+
+Scheduling regular cleanup

Review Comment:
   Umm, I do not think we should mention "schedule"



##########
airflow-core/docs/administration-and-deployment/state-cleanup.rst:
##########
@@ -0,0 +1,109 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _state-cleanup:
+
+State Cleanup
+=============
+
+.. versionadded:: 3.3
+
+Airflow does not automatically purge task state rows on a schedule. Cleanup 
(also known as "garbage collection") is the responsibility of the user (you) 
and must be triggered explicitly via the CLI. This page explains what gets 
cleaned up, how to run it, and how to integrate it into a recurring maintenance 
workflow.
+
+
+What gets cleaned up
+--------------------
+
+The cleanup command operates only on **task state** rows in the 
``MetastoreStateBackend``. Asset state rows are never touched by this command. 
Asset state rows are removed only by the orphan sweep when an asset is 
deactivated (see :ref:`state-store`).
+
+A task state row is eligible for deletion when its ``expires_at`` timestamp is 
in the past. ``expires_at`` is computed on the worker at write time:
+
+* Keys written with an explicit ``retention=timedelta(...)`` expire after that 
duration from the time of the write.
+* Keys written with ``retention=None`` (the default) pick up an expiry based 
on ``[state_store] default_retention_days``. If that value is ``> 0``, the key 
expires that many days after the write.

Review Comment:
   ```suggestion
   * Keys written with ``retention=None`` (the default) pick up an expiry based 
on ``[state_store] default_retention_days``. 
   ```
   
   Negative isn't possible



##########
airflow-core/docs/administration-and-deployment/state-cleanup.rst:
##########
@@ -0,0 +1,109 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _state-cleanup:
+
+State Cleanup
+=============
+
+.. versionadded:: 3.3
+
+Airflow does not automatically purge task state rows on a schedule. Cleanup 
(also known as "garbage collection") is the responsibility of the user (you) 
and must be triggered explicitly via the CLI. This page explains what gets 
cleaned up, how to run it, and how to integrate it into a recurring maintenance 
workflow.
+
+
+What gets cleaned up
+--------------------
+
+The cleanup command operates only on **task state** rows in the 
``MetastoreStateBackend``. Asset state rows are never touched by this command. 
Asset state rows are removed only by the orphan sweep when an asset is 
deactivated (see :ref:`state-store`).
+
+A task state row is eligible for deletion when its ``expires_at`` timestamp is 
in the past. ``expires_at`` is computed on the worker at write time:
+
+* Keys written with an explicit ``retention=timedelta(...)`` expire after that 
duration from the time of the write.
+* Keys written with ``retention=None`` (the default) pick up an expiry based 
on ``[state_store] default_retention_days``. If that value is ``> 0``, the key 
expires that many days after the write.
+* Keys written with ``retention=NEVER_EXPIRE`` have ``expires_at = NULL`` and 
a flag that marks them as permanent. They are **never** deleted by this command 
regardless of configuration.
+
+If ``[state_store] default_retention_days = 0``, keys written without an 
explicit retention have ``expires_at = NULL`` (no expiry) and are also skipped. 
Only keys with a non-null, past ``expires_at`` are removed.
+
+.. note::
+
+   Custom backends (``[state_store] backend`` set to anything other than the 
default) are **explicitly skipped**. The cleanup command prints a message and 
exits cleanly without deleting anything. If your custom backend needs its own 
retention logic, implement it in ``BaseStateBackend.cleanup()`` and call it 
from your own maintenance process.
+
+
+Running cleanup
+---------------
+
+The command is::
+
+    airflow state-store cleanup-task-states
+
+It reads ``[state_store] default_retention_days`` and ``[state_store] 
state_cleanup_batch_size`` from the ``airflow.cfg`` file, then deletes all 
eligible rows.
+
+**Dry run**
+
+Use ``--dry-run`` to preview what would be deleted without removing anything::
+
+    airflow state-store cleanup-task-states --dry-run
+
+The output lists every row that would be deleted, grouped by dag, run, task, 
map index, and key.
+
+**Batching**
+
+By default (``state_cleanup_batch_size = 0``) all eligible rows are deleted in 
a single statement. On deployments with large ``task_state`` tables, set a 
batch size to reduce lock duration per transaction::
+
+    # airflow.cfg
+    [state_store]
+    state_cleanup_batch_size = 10000
+
+The command then deletes rows in batches of 10,000, committing after each 
batch, until no eligible rows remain.
+
+
+Scheduling regular cleanup
+--------------------------
+
+Because Airflow does not run cleanup automatically, you need to schedule it. 
The recommended approaches are:
+
+**System cron**
+
+Add a cron entry on the host that runs the Airflow scheduler or API server:
+
+.. code-block:: bash
+
+    # Run cleanup daily at 02:00
+    0 2 * * * airflow state-store cleanup-task-states
+
+**Airflow Dag**
+
+You can also drive cleanup from a Dag so it is visible in the UI and benefits 
from Airflow's retry and alerting:
+
+.. code-block:: python
+
+    from datetime import timedelta
+
+    from airflow.sdk import DAG
+    from airflow.providers.standard.operators.bash import BashOperator
+
+    with DAG(
+        "state_store_cleanup",
+        schedule=timedelta(days=1),
+        catchup=False,
+    ):
+        BashOperator(
+            task_id="cleanup_task_states",
+            bash_command="airflow state-store cleanup-task-states",
+        )

Review Comment:
   Unsure if we need this section, we should just focus on the CLI command and 
let the users handle it as they like



##########
airflow-core/docs/administration-and-deployment/state-store.rst:
##########
@@ -0,0 +1,255 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+ ..  http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _state-store:
+
+State Store Configuration
+==========================
+
+.. versionadded:: 3.3
+
+The state store is the persistence layer for :doc:`task state 
</core-concepts/task-state>` and :doc:`asset state 
</core-concepts/asset-state>`. By default, both are stored in the Airflow 
metadata database. This page describes the available configuration options, 
garbage-collection semantics, and how to provide a custom backend.
+
+Configuration reference
+-----------------------
+
+All options live under the ``[state_store]`` section of ``airflow.cfg``.
+
+.. note::
+
+   The config section is ``[state_store]``, **not** ``[task_state]``.
+
+``backend``
+~~~~~~~~~~~
+
+Full dotted path to a class that implements 
:class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in 
metastore backend.
+
+.. code-block:: ini
+
+    [state_store]
+    backend = mypackage.state.CustomStateBackend
+
+``default_retention_days``
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Number of days to retain **task state** rows after their last update. Rows 
older than this are deleted during the next garbage collection pass.
+
+* Set to ``0`` to disable time-based cleanup entirely.
+* Default: ``30``.
+* This setting does **not** apply to asset state rows.
+
+.. code-block:: ini
+
+    [state_store]
+    default_retention_days = 30
+
+``clear_on_success``
+~~~~~~~~~~~~~~~~~~~~
+
+When ``True``, all task state keys for a task instance are automatically 
deleted when that task instance moves to the ``success`` state. Defaults to 
``False``, which preserves task state after success for observability (e.g.the 
submitted job ID or the last row count is still readable from the UI orREST API 
after the run completes).
+
+.. important::
+
+   ``clear_on_success`` clears **task state only**. It has no effect on asset 
state. Asset state is scoped to the asset rather than the task instance and 
must be cleared explicitly.
+
+.. code-block:: ini
+
+    [state_store]
+    clear_on_success = False
+
+``state_cleanup_batch_size``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Number of rows deleted per batch during garbage collection cleanup. Set to 
``0`` (default) to delete all matching rows in a single statement. Tune this on 
deployments with large ``task_state`` tables to reduce lock contention.
+
+.. code-block:: ini
+
+    [state_store]
+    state_cleanup_batch_size = 10000
+
+Worker-side backend (``[workers] state_backend``)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+A separate, optional config key under ``[workers]`` lets you route task state 
and asset state values through a worker-side backend before they reach the API 
server.
+
+.. code-block:: ini
+
+    [workers]
+    state_backend = mypackage.state.S3StateBackend
+
+When this is set, ``TaskStateAccessor.set()`` calls 
``serialize_task_state_to_ref()`` on the worker-side backend before sending the 
returned value (a reference to the actual storage) to the Execution API, and 
``get()`` calls ``deserialize_task_state_from_ref()`` after receiving the 
stored reference from the Execution API. See `Custom worker-side backends`_ 
below.
+
+
+Garbage collection semantics
+-----------------------------
+
+The cleanup task, also known as "garbage collection" is triggered using the 
Airflow CLI. The command to trigger the cleanup task is ``airflow state-store 
cleanup-task-states``. This process removes state rows according to the 
following rules:
+
+**Time-based expiry (task state only)**
+  Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on 
the *worker* at write time, not by the server.

Review Comment:
   ```suggestion
     Past rows are deleted, ie: rows whose ``expires_at < now()`` are deleted. 
``expires_at`` is computed on the *worker* at write time, not by the server.
   ```



##########
airflow-core/docs/core-concepts/task-state.rst:
##########
@@ -0,0 +1,270 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:task-state:
+
+Task State
+==========
+
+.. versionadded:: 3.3
+
+Task state is a persistent key/value store scoped to a single task instance 
(``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker 
crashes and task retries within the same Dag run, making it suitable for 
storing external job IDs, intra-task checkpoints, and progress metadata.
+
+Task state is accessed through the task context via ``context["task_state"]`` 
and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``.
+
+
+Accessing task state
+--------------------
+
+Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, 
task state is available through the ``context`` dictionary via the 
``task_state`` key. From there, it can be used to retrieve, set, delete, or 
clear task state for a specific key-value pair. In this example, the ``job_id`` 
is retrieved from task state, then upated.
+
+.. code-block:: python
+    from airflow.sdk import task
+    import random
+
+    @task
+    def my_task(**context):
+        # Retrieve task_state from context
+        task_state = context["task_state"]
+        my_value = task_state.get("my_key")
+
+        # Set the new value
+        new_value = f"It is {random.randint(1, 12 + 1)} o'clock"

Review Comment:
   Lets mention delete and clear here too



##########
airflow-core/docs/core-concepts/task-state.rst:
##########
@@ -0,0 +1,270 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:task-state:
+
+Task State
+==========
+
+.. versionadded:: 3.3
+
+Task state is a persistent key/value store scoped to a single task instance 
(``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker 
crashes and task retries within the same Dag run, making it suitable for 
storing external job IDs, intra-task checkpoints, and progress metadata.
+
+Task state is accessed through the task context via ``context["task_state"]`` 
and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``.
+
+
+Accessing task state
+--------------------
+
+Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, 
task state is available through the ``context`` dictionary via the 
``task_state`` key. From there, it can be used to retrieve, set, delete, or 
clear task state for a specific key-value pair. In this example, the ``job_id`` 
is retrieved from task state, then upated.
+
+.. code-block:: python
+    from airflow.sdk import task
+    import random
+
+    @task
+    def my_task(**context):
+        # Retrieve task_state from context
+        task_state = context["task_state"]
+        my_value = task_state.get("my_key")
+
+        # Set the new value
+        new_value = f"It is {random.randint(1, 12 + 1)} o'clock"
+        task_state.set("my_key", new_value)
+
+Reference
+-------------
+
+``get(key)``
+~~~~~~~~~~~~
+
+Returns the stored string value, or ``None`` if the key does not exist.
+
+.. code-block:: python
+
+    value = task_state.get("job_id")  # returns str or None
+
+``set(key, value, *, retention=None)``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Writes or overwrites a value for the specified key. Note, ``value`` can be any 
JSON-compatible type, including:
+
+* ``str``
+* ``int``
+* ``float``
+* ``bool``
+* ``list``
+* ``dict``

Review Comment:
   Maybe add a Note that `None` cannot be set



##########
airflow-core/docs/core-concepts/task-state.rst:
##########
@@ -0,0 +1,270 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:task-state:
+
+Task State
+==========
+
+.. versionadded:: 3.3
+
+Task state is a persistent key/value store scoped to a single task instance 
(``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker 
crashes and task retries within the same Dag run, making it suitable for 
storing external job IDs, intra-task checkpoints, and progress metadata.
+
+Task state is accessed through the task context via ``context["task_state"]`` 
and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``.
+
+
+Accessing task state
+--------------------
+
+Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, 
task state is available through the ``context`` dictionary via the 
``task_state`` key. From there, it can be used to retrieve, set, delete, or 
clear task state for a specific key-value pair. In this example, the ``job_id`` 
is retrieved from task state, then upated.
+
+.. code-block:: python
+    from airflow.sdk import task
+    import random
+
+    @task
+    def my_task(**context):
+        # Retrieve task_state from context
+        task_state = context["task_state"]
+        my_value = task_state.get("my_key")
+
+        # Set the new value
+        new_value = f"It is {random.randint(1, 12 + 1)} o'clock"
+        task_state.set("my_key", new_value)
+
+Reference
+-------------
+
+``get(key)``
+~~~~~~~~~~~~
+
+Returns the stored string value, or ``None`` if the key does not exist.
+
+.. code-block:: python
+
+    value = task_state.get("job_id")  # returns str or None

Review Comment:
   Mention example with `default` too



##########
airflow-core/docs/core-concepts/task-state.rst:
##########
@@ -0,0 +1,270 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:task-state:
+
+Task State
+==========
+
+.. versionadded:: 3.3
+
+Task state is a persistent key/value store scoped to a single task instance 
(``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker 
crashes and task retries within the same Dag run, making it suitable for 
storing external job IDs, intra-task checkpoints, and progress metadata.
+
+Task state is accessed through the task context via ``context["task_state"]`` 
and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``.
+
+
+Accessing task state
+--------------------
+
+Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, 
task state is available through the ``context`` dictionary via the 
``task_state`` key. From there, it can be used to retrieve, set, delete, or 
clear task state for a specific key-value pair. In this example, the ``job_id`` 
is retrieved from task state, then upated.
+
+.. code-block:: python
+    from airflow.sdk import task
+    import random
+
+    @task
+    def my_task(**context):
+        # Retrieve task_state from context
+        task_state = context["task_state"]
+        my_value = task_state.get("my_key")
+
+        # Set the new value
+        new_value = f"It is {random.randint(1, 12 + 1)} o'clock"
+        task_state.set("my_key", new_value)
+
+Reference
+-------------
+
+``get(key)``
+~~~~~~~~~~~~
+
+Returns the stored string value, or ``None`` if the key does not exist.
+
+.. code-block:: python
+
+    value = task_state.get("job_id")  # returns str or None
+
+``set(key, value, *, retention=None)``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Writes or overwrites a value for the specified key. Note, ``value`` can be any 
JSON-compatible type, including:
+
+* ``str``
+* ``int``
+* ``float``
+* ``bool``
+* ``list``
+* ``dict``
+
+The optional ``retention`` argument controls when the key expires:
+
+* ``timedelta(...)``: expire after the given duration from the time of the 
write (e.g. ``timedelta(hours=6)``).  The expiry timestamp is computed on the 
worker in UTC before the value is sent to the API server.
+* ``NEVER_EXPIRE``: the key never expires and is skipped by garbage 
collection, regardless of the global ``[state_store] default_retention_days`` 
setting.
+* ``None`` (default): fall back to the global ``[state_store] 
default_retention_days`` config.
+
+.. important::
+
+   ``retention`` accepts only a :class:`~datetime.timedelta`, not a plain 
integer number of days.  Passing an integer raises a ``TypeError``.
+
+   .. code-block:: python
+
+       # correct
+       task_state.set("key", "val", retention=timedelta(days=7))
+
+       # wrong — raises TypeError
+       task_state.set("key", "val", retention=7)
+
+``NEVER_EXPIRE`` sentinel
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Import ``NEVER_EXPIRE`` from ``airflow.sdk.execution_time.context``:
+
+.. code-block:: python
+
+    from airflow.sdk.execution_time.context import NEVER_EXPIRE
+
+    task_state.set("job_id", job_id, retention=NEVER_EXPIRE)
+
+``delete(key)``
+~~~~~~~~~~~~~~~
+
+Deletes a single key.  No-op if the key does not exist.
+
+.. code-block:: python
+
+    task_state.delete("job_id")
+
+``clear(all_map_indices=False)``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Deletes *all* state keys for this task instance.
+
+For :doc:`mapped tasks <dynamic-task-mapping>`, the default clears only the 
current map index.  Pass ``all_map_indices=True`` to wipe state across 
**every** mapped instance of the task (fleet-wide reset).
+
+.. code-block:: python
+
+    # clear only this map index
+    task_state.clear()
+
+    # clear all map indices (fleet-wide)
+    task_state.clear(all_map_indices=True)
+
+
+Use Cases
+---------
+
+External job resumption
+~~~~~~~~~~~~~~~~~~~~~~~
+
+A common pattern for long-running external jobs: check whether a job ID is 
already stored before submitting, and use ``NEVER_EXPIRE`` so the key outlives
+the default retention window.
+
+.. code-block:: python
+
+    from datetime import timedelta
+
+    from airflow.sdk import DAG, task
+    from airflow.sdk.execution_time.context import NEVER_EXPIRE
+
+
+    with DAG("spark_job_dag", schedule=None):
+
+        @task
+        def run_spark_job(**context):
+            task_state = context["task_state"]
+
+            # Check for an already-submitted job from a previous attempt.
+            job_id = task_state.get("job_id")
+            if job_id is None:
+                job_id = spark_client.submit_job(...)
+                # Store with NEVER_EXPIRE so the key is not garbage-collected 
before the job finishes
+                task_state.set("job_id", str(job_id), retention=NEVER_EXPIRE)
+
+            # Reattach to the job and wait for completion.
+            result = spark_client.wait_for_completion(job_id)
+            return result
+
+On a retry, the task finds the stored ``job_id`` and reattaches instead of 
submitting a duplicate job. Another example of this sort of logic can be found 
in `example_task_state.py 
<https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/example_dags/example_task_state.py>`_.
+
+Intra-task checkpointing
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+For tasks that process paginated or batched data, store the last-completed 
offset so a retry can resume mid-stream rather than restarting from the 
beginning.
+
+.. code-block:: python
+
+    from airflow.sdk import DAG, task
+
+
+    with DAG("paginated_ingest", schedule="@daily"):
+
+        @task
+        def ingest_pages(**context):
+            # Retrieve the task_state
+            task_state = context["task_state"]
+            raw = task_state.get("last_page")
+
+            start_page = raw + 1 if raw is not None else 1
+
+            for page in range(start_page, total_pages + 1):
+                fetch_and_load(page)
+                task_state.set("last_page", page)  # Update the task_state for 
reuse later
+
+
+On a retry, the task reads ``last_page`` and skips pages that were already 
processed.
+
+Progress metadata
+~~~~~~~~~~~~~~~~~
+
+Task state can expose in-progress metrics for observability — row counts, 
status strings, or lightweight JSON payloads — without requiring XCom or an 
external system.
+
+.. code-block:: python
+
+    import json
+
+    from airflow.sdk import DAG, task
+
+
+    with DAG("row_ingest", schedule="@hourly"):
+
+        @task
+        def ingest_rows(**context):
+            task_state = context["task_state"]
+            total = 0
+
+            for batch in get_batches():
+                load(batch)
+                total += len(batch)
+                task_state.set(
+                    "progress",
+                    {
+                        "rows_loaded": total,
+                        "status": "running"
+                    },
+                )
+
+            task_state.set(
+                "progress",
+                json.dumps({
+                    "rows_loaded": total,
+                    "status": "done"
+                }),

Review Comment:
   ```suggestion
                   },
   ```



##########
airflow-core/docs/core-concepts/task-state.rst:
##########
@@ -0,0 +1,270 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:task-state:
+
+Task State
+==========
+
+.. versionadded:: 3.3
+
+Task state is a persistent key/value store scoped to a single task instance 
(``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker 
crashes and task retries within the same Dag run, making it suitable for 
storing external job IDs, intra-task checkpoints, and progress metadata.
+
+Task state is accessed through the task context via ``context["task_state"]`` 
and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``.
+
+
+Accessing task state
+--------------------
+
+Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, 
task state is available through the ``context`` dictionary via the 
``task_state`` key. From there, it can be used to retrieve, set, delete, or 
clear task state for a specific key-value pair. In this example, the ``job_id`` 
is retrieved from task state, then upated.
+
+.. code-block:: python
+    from airflow.sdk import task
+    import random
+
+    @task
+    def my_task(**context):
+        # Retrieve task_state from context
+        task_state = context["task_state"]
+        my_value = task_state.get("my_key")
+
+        # Set the new value
+        new_value = f"It is {random.randint(1, 12 + 1)} o'clock"
+        task_state.set("my_key", new_value)
+
+Reference
+-------------
+
+``get(key)``

Review Comment:
   Now its `get(key, default)`



##########
airflow-core/docs/core-concepts/task-state.rst:
##########
@@ -0,0 +1,270 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:task-state:
+
+Task State
+==========
+
+.. versionadded:: 3.3
+
+Task state is a persistent key/value store scoped to a single task instance 
(``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker 
crashes and task retries within the same Dag run, making it suitable for 
storing external job IDs, intra-task checkpoints, and progress metadata.
+
+Task state is accessed through the task context via ``context["task_state"]`` 
and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``.
+
+
+Accessing task state
+--------------------
+
+Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, 
task state is available through the ``context`` dictionary via the 
``task_state`` key. From there, it can be used to retrieve, set, delete, or 
clear task state for a specific key-value pair. In this example, the ``job_id`` 
is retrieved from task state, then upated.
+
+.. code-block:: python
+    from airflow.sdk import task
+    import random
+
+    @task
+    def my_task(**context):
+        # Retrieve task_state from context
+        task_state = context["task_state"]
+        my_value = task_state.get("my_key")
+
+        # Set the new value
+        new_value = f"It is {random.randint(1, 12 + 1)} o'clock"
+        task_state.set("my_key", new_value)
+
+Reference
+-------------
+
+``get(key)``
+~~~~~~~~~~~~
+
+Returns the stored string value, or ``None`` if the key does not exist.
+
+.. code-block:: python
+
+    value = task_state.get("job_id")  # returns str or None
+
+``set(key, value, *, retention=None)``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Writes or overwrites a value for the specified key. Note, ``value`` can be any 
JSON-compatible type, including:
+
+* ``str``
+* ``int``
+* ``float``
+* ``bool``
+* ``list``
+* ``dict``
+
+The optional ``retention`` argument controls when the key expires:
+
+* ``timedelta(...)``: expire after the given duration from the time of the 
write (e.g. ``timedelta(hours=6)``).  The expiry timestamp is computed on the 
worker in UTC before the value is sent to the API server.
+* ``NEVER_EXPIRE``: the key never expires and is skipped by garbage 
collection, regardless of the global ``[state_store] default_retention_days`` 
setting.
+* ``None`` (default): fall back to the global ``[state_store] 
default_retention_days`` config.
+
+.. important::
+
+   ``retention`` accepts only a :class:`~datetime.timedelta`, not a plain 
integer number of days.  Passing an integer raises a ``TypeError``.
+
+   .. code-block:: python
+
+       # correct
+       task_state.set("key", "val", retention=timedelta(days=7))
+
+       # wrong — raises TypeError
+       task_state.set("key", "val", retention=7)
+
+``NEVER_EXPIRE`` sentinel
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Import ``NEVER_EXPIRE`` from ``airflow.sdk.execution_time.context``:
+
+.. code-block:: python
+
+    from airflow.sdk.execution_time.context import NEVER_EXPIRE
+
+    task_state.set("job_id", job_id, retention=NEVER_EXPIRE)
+
+``delete(key)``
+~~~~~~~~~~~~~~~
+
+Deletes a single key.  No-op if the key does not exist.
+
+.. code-block:: python
+
+    task_state.delete("job_id")
+
+``clear(all_map_indices=False)``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Deletes *all* state keys for this task instance.
+
+For :doc:`mapped tasks <dynamic-task-mapping>`, the default clears only the 
current map index.  Pass ``all_map_indices=True`` to wipe state across 
**every** mapped instance of the task (fleet-wide reset).
+
+.. code-block:: python
+
+    # clear only this map index
+    task_state.clear()
+
+    # clear all map indices (fleet-wide)
+    task_state.clear(all_map_indices=True)
+
+
+Use Cases
+---------
+
+External job resumption
+~~~~~~~~~~~~~~~~~~~~~~~
+
+A common pattern for long-running external jobs: check whether a job ID is 
already stored before submitting, and use ``NEVER_EXPIRE`` so the key outlives
+the default retention window.
+
+.. code-block:: python
+
+    from datetime import timedelta
+
+    from airflow.sdk import DAG, task
+    from airflow.sdk.execution_time.context import NEVER_EXPIRE
+
+
+    with DAG("spark_job_dag", schedule=None):
+
+        @task
+        def run_spark_job(**context):
+            task_state = context["task_state"]
+
+            # Check for an already-submitted job from a previous attempt.
+            job_id = task_state.get("job_id")
+            if job_id is None:
+                job_id = spark_client.submit_job(...)
+                # Store with NEVER_EXPIRE so the key is not garbage-collected 
before the job finishes
+                task_state.set("job_id", str(job_id), retention=NEVER_EXPIRE)

Review Comment:
   ```suggestion
                   task_state.set("job_id", job_id, retention=NEVER_EXPIRE)
   ```



##########
airflow-core/docs/core-concepts/task-state.rst:
##########
@@ -0,0 +1,270 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:task-state:
+
+Task State
+==========
+
+.. versionadded:: 3.3
+
+Task state is a persistent key/value store scoped to a single task instance 
(``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker 
crashes and task retries within the same Dag run, making it suitable for 
storing external job IDs, intra-task checkpoints, and progress metadata.
+
+Task state is accessed through the task context via ``context["task_state"]`` 
and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``.
+
+
+Accessing task state
+--------------------
+
+Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, 
task state is available through the ``context`` dictionary via the 
``task_state`` key. From there, it can be used to retrieve, set, delete, or 
clear task state for a specific key-value pair. In this example, the ``job_id`` 
is retrieved from task state, then upated.
+
+.. code-block:: python
+    from airflow.sdk import task
+    import random
+
+    @task
+    def my_task(**context):
+        # Retrieve task_state from context
+        task_state = context["task_state"]
+        my_value = task_state.get("my_key")
+
+        # Set the new value
+        new_value = f"It is {random.randint(1, 12 + 1)} o'clock"
+        task_state.set("my_key", new_value)
+
+Reference
+-------------
+
+``get(key)``
+~~~~~~~~~~~~
+
+Returns the stored string value, or ``None`` if the key does not exist.
+
+.. code-block:: python
+
+    value = task_state.get("job_id")  # returns str or None
+
+``set(key, value, *, retention=None)``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Writes or overwrites a value for the specified key. Note, ``value`` can be any 
JSON-compatible type, including:
+
+* ``str``
+* ``int``
+* ``float``
+* ``bool``
+* ``list``
+* ``dict``
+
+The optional ``retention`` argument controls when the key expires:
+
+* ``timedelta(...)``: expire after the given duration from the time of the 
write (e.g. ``timedelta(hours=6)``).  The expiry timestamp is computed on the 
worker in UTC before the value is sent to the API server.
+* ``NEVER_EXPIRE``: the key never expires and is skipped by garbage 
collection, regardless of the global ``[state_store] default_retention_days`` 
setting.
+* ``None`` (default): fall back to the global ``[state_store] 
default_retention_days`` config.
+
+.. important::
+
+   ``retention`` accepts only a :class:`~datetime.timedelta`, not a plain 
integer number of days.  Passing an integer raises a ``TypeError``.
+
+   .. code-block:: python
+
+       # correct
+       task_state.set("key", "val", retention=timedelta(days=7))
+
+       # wrong — raises TypeError
+       task_state.set("key", "val", retention=7)
+
+``NEVER_EXPIRE`` sentinel
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Import ``NEVER_EXPIRE`` from ``airflow.sdk.execution_time.context``:
+
+.. code-block:: python
+
+    from airflow.sdk.execution_time.context import NEVER_EXPIRE

Review Comment:
   It's possible:
   
   ```
   from airflow.sdk import NEVER_EXPIRE
   NEVER_EXPIRE
   Out[3]: datetime.timedelta(days=999999999, seconds=86399, 
microseconds=999999)
   ```



##########
airflow-core/docs/administration-and-deployment/state-cleanup.rst:
##########
@@ -0,0 +1,109 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _state-cleanup:
+
+State Cleanup
+=============
+
+.. versionadded:: 3.3
+
+Airflow does not automatically purge task state rows on a schedule. Cleanup 
(also known as "garbage collection") is the responsibility of the user (you) 
and must be triggered explicitly via the CLI. This page explains what gets 
cleaned up, how to run it, and how to integrate it into a recurring maintenance 
workflow.
+
+
+What gets cleaned up
+--------------------
+
+The cleanup command operates only on **task state** rows in the 
``MetastoreStateBackend``. Asset state rows are never touched by this command. 
Asset state rows are removed only by the orphan sweep when an asset is 
deactivated (see :ref:`state-store`).
+
+A task state row is eligible for deletion when its ``expires_at`` timestamp is 
in the past. ``expires_at`` is computed on the worker at write time:
+
+* Keys written with an explicit ``retention=timedelta(...)`` expire after that 
duration from the time of the write.
+* Keys written with ``retention=None`` (the default) pick up an expiry based 
on ``[state_store] default_retention_days``. If that value is ``> 0``, the key 
expires that many days after the write.
+* Keys written with ``retention=NEVER_EXPIRE`` have ``expires_at = NULL`` and 
a flag that marks them as permanent. They are **never** deleted by this command 
regardless of configuration.
+
+If ``[state_store] default_retention_days = 0``, keys written without an 
explicit retention have ``expires_at = NULL`` (no expiry) and are also skipped. 
Only keys with a non-null, past ``expires_at`` are removed.
+
+.. note::
+
+   Custom backends (``[state_store] backend`` set to anything other than the 
default) are **explicitly skipped**. The cleanup command prints a message and 
exits cleanly without deleting anything. If your custom backend needs its own 
retention logic, implement it in ``BaseStateBackend.cleanup()`` and call it 
from your own maintenance process.

Review Comment:
   ```suggestion
      Custom backends (``[state_store] backend`` set to anything other than the 
default) are **explicitly not cleaned up**. The cleanup command prints a 
message and exits cleanly without deleting anything. If your custom backend 
needs its own retention logic, implement it in ``BaseStateBackend.cleanup()`` 
and call it from your own maintenance process.
   ```
   
   Better?



##########
airflow-core/docs/core-concepts/task-and-asset-state.rst:
##########
@@ -0,0 +1,80 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:state-overview:
+
+Task and Asset State Overview
+========
+
+.. versionadded:: 3.3
+
+Airflow has always modeled tasks as stateless, idempotent units of work. A 
growing class of workloads, however, require a small amount of data to be 
persisted outside of a Task's return value, like a submitted job ID that must 
survive a worker crash, a watermark that advances run-by-run, or a row counter 
exposed for observability. Task state and Asset state fill that gap without 
touching the XCom or Variable systems.

Review Comment:
   ```suggestion
   Airflow has always modeled tasks as stateless, idempotent units of work. A 
growing class of workloads, however, require a small amount of data to be 
persisted outside of a task's return value, like a submitted job ID that must 
survive a worker crash, a watermark that advances run-by-run, or a row counter 
exposed for observability. Task state and Asset state fill that gap without 
touching the XCom or Variable systems.
   ```



##########
airflow-core/docs/administration-and-deployment/state-store.rst:
##########
@@ -0,0 +1,255 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+ ..  http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _state-store:
+
+State Store Configuration
+==========================
+
+.. versionadded:: 3.3
+
+The state store is the persistence layer for :doc:`task state 
</core-concepts/task-state>` and :doc:`asset state 
</core-concepts/asset-state>`. By default, both are stored in the Airflow 
metadata database. This page describes the available configuration options, 
garbage-collection semantics, and how to provide a custom backend.
+
+Configuration reference
+-----------------------
+
+All options live under the ``[state_store]`` section of ``airflow.cfg``.
+
+.. note::
+
+   The config section is ``[state_store]``, **not** ``[task_state]``.
+
+``backend``
+~~~~~~~~~~~
+
+Full dotted path to a class that implements 
:class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in 
metastore backend.
+
+.. code-block:: ini
+
+    [state_store]
+    backend = mypackage.state.CustomStateBackend
+
+``default_retention_days``
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Number of days to retain **task state** rows after their last update. Rows 
older than this are deleted during the next garbage collection pass.
+
+* Set to ``0`` to disable time-based cleanup entirely.
+* Default: ``30``.
+* This setting does **not** apply to asset state rows.
+
+.. code-block:: ini
+
+    [state_store]
+    default_retention_days = 30
+
+``clear_on_success``
+~~~~~~~~~~~~~~~~~~~~
+
+When ``True``, all task state keys for a task instance are automatically 
deleted when that task instance moves to the ``success`` state. Defaults to 
``False``, which preserves task state after success for observability (e.g.the 
submitted job ID or the last row count is still readable from the UI orREST API 
after the run completes).
+
+.. important::
+
+   ``clear_on_success`` clears **task state only**. It has no effect on asset 
state. Asset state is scoped to the asset rather than the task instance and 
must be cleared explicitly.
+
+.. code-block:: ini
+
+    [state_store]
+    clear_on_success = False
+
+``state_cleanup_batch_size``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Number of rows deleted per batch during garbage collection cleanup. Set to 
``0`` (default) to delete all matching rows in a single statement. Tune this on 
deployments with large ``task_state`` tables to reduce lock contention.
+
+.. code-block:: ini
+
+    [state_store]
+    state_cleanup_batch_size = 10000
+
+Worker-side backend (``[workers] state_backend``)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+A separate, optional config key under ``[workers]`` lets you route task state 
and asset state values through a worker-side backend before they reach the API 
server.
+
+.. code-block:: ini
+
+    [workers]
+    state_backend = mypackage.state.S3StateBackend
+
+When this is set, ``TaskStateAccessor.set()`` calls 
``serialize_task_state_to_ref()`` on the worker-side backend before sending the 
returned value (a reference to the actual storage) to the Execution API, and 
``get()`` calls ``deserialize_task_state_from_ref()`` after receiving the 
stored reference from the Execution API. See `Custom worker-side backends`_ 
below.
+
+
+Garbage collection semantics
+-----------------------------
+
+The cleanup task, also known as "garbage collection" is triggered using the 
Airflow CLI. The command to trigger the cleanup task is ``airflow state-store 
cleanup-task-states``. This process removes state rows according to the 
following rules:
+
+**Time-based expiry (task state only)**
+  Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on 
the *worker* at write time, not by the server.
+
+**``default_retention_days`` fallback (task state only)**
+  Keys written with no explicit ``retention`` (i.e. ``expires_at`` is 
``NULL``) are governed by the global ``default_retention_days`` setting. When 
this setting is positive, the garbage collection job treats such rows as 
expiring ``default_retention_days`` days after their last update.
+
+**``NEVER_EXPIRE`` keys**
+  Keys set with ``retention=NEVER_EXPIRE`` are stored with ``expires_at = 
NULL`` and a flag that tells the garbage collection to skip them 
unconditionally. They are never deleted by time-based cleanup, regardless of 
``default_retention_days``.
+
+**Orphan sweep (asset state)**
+  Asset state rows for assets that no longer have an ``asset_active`` record 
are deleted during the orphan-sweep pass. This cleans up state for deactivated 
or renamed assets.
+
+.. important::
+
+   Garbage collection only works for the ``MetastoreStateBackend``. Custom 
backends are explicitly skipped.
+
+
+
+Custom backends
+---------------
+
+To replace the default metastore backend, subclass 
:class:`~airflow.sdk.state.BaseStateBackend` and implement all eight abstract 
methods.
+
+Abstract methods
+~~~~~~~~~~~~~~~~
+
+There are four synchronous methods and four async equivalents:
+
+.. list-table::
+   :header-rows: 1
+   :widths: 55 45
+
+   * - Signature
+     - Description
+   * - ``def get(self, scope: StateScope, key: str, *, session: Session | None 
= None) -> str | None``
+     - Return the stored value, or ``None``.
+   * - ``def set(self, scope: StateScope, key: str, value: str, *, expires_at: 
datetime | None = None, session: Session | None = None) -> None``
+     - Write or overwrite a key. ``expires_at`` is a UTC datetime or ``None``
+       for non-expiring keys.
+   * - ``def delete(self, scope: StateScope, key: str, *, session: Session | 
None = None) -> None``
+     - Delete a single key; no-op if absent.
+   * - ``def clear(self, scope: StateScope, *, all_map_indices: bool = False, 
session: Session | None = None) -> None``
+     - Delete all keys under the scope.
+   * - ``async def aget(self, scope: StateScope, key: str, *, session: 
AsyncSession | None = None) -> str | None``
+     - Async variant of ``get``.
+   * - ``async def aset(self, scope: StateScope, key: str, value: str, *, 
expires_at: datetime | None = None, session: AsyncSession | None = None) -> 
None``
+     - Async variant of ``set``.
+   * - ``async def adelete(self, scope: StateScope, key: str, *, session: 
AsyncSession | None = None) -> None``
+     - Async variant of ``delete``.
+   * - ``async def aclear(self, scope: StateScope, *, all_map_indices: bool = 
False, session: AsyncSession | None = None) -> None``
+     - Async variant of ``clear``.

Review Comment:
   Now that I think of it, since these are signatures that can change, maybe we 
do not mention it at all like how secrets backend does: 
https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/secrets-backend/index.html?
   
   They just mention that the interface must be implemented



##########
airflow-core/docs/administration-and-deployment/state-store.rst:
##########
@@ -0,0 +1,255 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+ ..  http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _state-store:
+
+State Store Configuration
+==========================
+
+.. versionadded:: 3.3
+
+The state store is the persistence layer for :doc:`task state 
</core-concepts/task-state>` and :doc:`asset state 
</core-concepts/asset-state>`. By default, both are stored in the Airflow 
metadata database. This page describes the available configuration options, 
garbage-collection semantics, and how to provide a custom backend.
+
+Configuration reference
+-----------------------
+
+All options live under the ``[state_store]`` section of ``airflow.cfg``.
+
+.. note::
+
+   The config section is ``[state_store]``, **not** ``[task_state]``.
+
+``backend``
+~~~~~~~~~~~
+
+Full dotted path to a class that implements 
:class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in 
metastore backend.
+
+.. code-block:: ini
+
+    [state_store]
+    backend = mypackage.state.CustomStateBackend
+
+``default_retention_days``
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Number of days to retain **task state** rows after their last update. Rows 
older than this are deleted during the next garbage collection pass.
+
+* Set to ``0`` to disable time-based cleanup entirely.
+* Default: ``30``.
+* This setting does **not** apply to asset state rows.
+
+.. code-block:: ini
+
+    [state_store]
+    default_retention_days = 30
+
+``clear_on_success``
+~~~~~~~~~~~~~~~~~~~~
+
+When ``True``, all task state keys for a task instance are automatically 
deleted when that task instance moves to the ``success`` state. Defaults to 
``False``, which preserves task state after success for observability (e.g.the 
submitted job ID or the last row count is still readable from the UI orREST API 
after the run completes).
+
+.. important::
+
+   ``clear_on_success`` clears **task state only**. It has no effect on asset 
state. Asset state is scoped to the asset rather than the task instance and 
must be cleared explicitly.
+
+.. code-block:: ini
+
+    [state_store]
+    clear_on_success = False
+
+``state_cleanup_batch_size``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Number of rows deleted per batch during garbage collection cleanup. Set to 
``0`` (default) to delete all matching rows in a single statement. Tune this on 
deployments with large ``task_state`` tables to reduce lock contention.
+
+.. code-block:: ini
+
+    [state_store]
+    state_cleanup_batch_size = 10000
+
+Worker-side backend (``[workers] state_backend``)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+A separate, optional config key under ``[workers]`` lets you route task state 
and asset state values through a worker-side backend before they reach the API 
server.
+
+.. code-block:: ini
+
+    [workers]
+    state_backend = mypackage.state.S3StateBackend
+
+When this is set, ``TaskStateAccessor.set()`` calls 
``serialize_task_state_to_ref()`` on the worker-side backend before sending the 
returned value (a reference to the actual storage) to the Execution API, and 
``get()`` calls ``deserialize_task_state_from_ref()`` after receiving the 
stored reference from the Execution API. See `Custom worker-side backends`_ 
below.
+
+
+Garbage collection semantics
+-----------------------------
+
+The cleanup task, also known as "garbage collection" is triggered using the 
Airflow CLI. The command to trigger the cleanup task is ``airflow state-store 
cleanup-task-states``. This process removes state rows according to the 
following rules:
+
+**Time-based expiry (task state only)**
+  Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on 
the *worker* at write time, not by the server.
+
+**``default_retention_days`` fallback (task state only)**
+  Keys written with no explicit ``retention`` (i.e. ``expires_at`` is 
``NULL``) are governed by the global ``default_retention_days`` setting. When 
this setting is positive, the garbage collection job treats such rows as 
expiring ``default_retention_days`` days after their last update.
+
+**``NEVER_EXPIRE`` keys**
+  Keys set with ``retention=NEVER_EXPIRE`` are stored with ``expires_at = 
NULL`` and a flag that tells the garbage collection to skip them 
unconditionally. They are never deleted by time-based cleanup, regardless of 
``default_retention_days``.
+
+**Orphan sweep (asset state)**
+  Asset state rows for assets that no longer have an ``asset_active`` record 
are deleted during the orphan-sweep pass. This cleans up state for deactivated 
or renamed assets.
+
+.. important::
+
+   Garbage collection only works for the ``MetastoreStateBackend``. Custom 
backends are explicitly skipped.
+
+
+
+Custom backends
+---------------
+
+To replace the default metastore backend, subclass 
:class:`~airflow.sdk.state.BaseStateBackend` and implement all eight abstract 
methods.
+
+Abstract methods
+~~~~~~~~~~~~~~~~
+
+There are four synchronous methods and four async equivalents:
+
+.. list-table::
+   :header-rows: 1
+   :widths: 55 45
+
+   * - Signature
+     - Description
+   * - ``def get(self, scope: StateScope, key: str, *, session: Session | None 
= None) -> str | None``
+     - Return the stored value, or ``None``.
+   * - ``def set(self, scope: StateScope, key: str, value: str, *, expires_at: 
datetime | None = None, session: Session | None = None) -> None``
+     - Write or overwrite a key. ``expires_at`` is a UTC datetime or ``None``
+       for non-expiring keys.
+   * - ``def delete(self, scope: StateScope, key: str, *, session: Session | 
None = None) -> None``
+     - Delete a single key; no-op if absent.
+   * - ``def clear(self, scope: StateScope, *, all_map_indices: bool = False, 
session: Session | None = None) -> None``
+     - Delete all keys under the scope.
+   * - ``async def aget(self, scope: StateScope, key: str, *, session: 
AsyncSession | None = None) -> str | None``
+     - Async variant of ``get``.
+   * - ``async def aset(self, scope: StateScope, key: str, value: str, *, 
expires_at: datetime | None = None, session: AsyncSession | None = None) -> 
None``
+     - Async variant of ``set``.
+   * - ``async def adelete(self, scope: StateScope, key: str, *, session: 
AsyncSession | None = None) -> None``
+     - Async variant of ``delete``.
+   * - ``async def aclear(self, scope: StateScope, *, all_map_indices: bool = 
False, session: AsyncSession | None = None) -> None``
+     - Async variant of ``clear``.
+
+Dispatching on scope type
+~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Each method receives a ``scope`` argument that is either a 
:class:`~airflow.sdk.state.TaskScope` or an 
:class:`~airflow.sdk.state.AssetScope`. Use a ``match`` statement to dispatch:
+
+.. code-block:: python
+
+    from airflow.sdk.state import BaseStateBackend, TaskScope, AssetScope
+
+
+    class MyBackend(BaseStateBackend):
+        def get(self, scope, key, *, session=None):
+            if isinstance(scope, TaskScope):
+                return self._task_store.get(scope, key)
+            elif isinstance(scope, AssetScope):
+                return self._asset_store.get(scope, key)
+
+:class:`~airflow.sdk.state.AssetScope` has three optional fields: ``asset_id`` 
(integer, server-side only), ``name``, and ``uri``. At least one must be set. 
Server-side operations (REST API calls) provide ``asset_id``. Worker-side 
operations provide ``name`` or ``uri`` (workers do not have access to the 
integer ``asset_id``).
+
+Configure the class via ``[state_store] backend``:
+
+.. code-block:: ini
+
+    [state_store]
+    backend = mypackage.state.MyBackend
+
+
+Custom worker-side backends
+----------------------------
+
+Worker-side backends extend ``BaseStateBackend`` with two pairs of 
serialization hooks. They are configured separately via ``[workers] 
state_backend`` and run *on the worker process*, not on the API server. This 
lets you store large payloads or credentialed data directly using worker 
infrastructure while only a compact reference string is kept in the database.
+
+Override these four methods:
+
+``def serialize_task_state_to_ref(self, *, value: str, key: str, ti_id: str) 
-> str``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Called by ``TaskStateAccessor.set()`` before sending the value to the 
Execution API. Return a reference string (e.g. an S3 key) that will be stored 
in the database instead of the raw value.
+
+``def deserialize_task_state_from_ref(self, stored: str) -> str``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Called by ``TaskStateAccessor.get()`` after retrieving the reference string 
from the Execution API. Return the actual value.
+
+``def serialize_asset_state_to_ref(self, *, value: str, key: str, asset_ref: 
str) -> str``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Same as the task variant, but for asset state. ``asset_ref`` is the asset name 
or URI, depending on how the accessor was constructed.
+
+``def deserialize_asset_state_from_ref(self, stored: str) -> str``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Called by ``AssetStateAccessor.get()`` to resolve the stored reference back to 
the actual value.

Review Comment:
   Same thought as above



##########
airflow-core/docs/core-concepts/task-and-asset-state.rst:
##########
@@ -0,0 +1,80 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:state-overview:
+
+Task and Asset State Overview
+========
+
+.. versionadded:: 3.3
+
+Airflow has always modeled tasks as stateless, idempotent units of work. A 
growing class of workloads, however, require a small amount of data to be 
persisted outside of a Task's return value, like a submitted job ID that must 
survive a worker crash, a watermark that advances run-by-run, or a row counter 
exposed for observability. Task state and Asset state fill that gap without 
touching the XCom or Variable systems.
+
+Task and Asset State
+--------------------
+
+Airflow 3.3 ships two persistent key/value stores, differentiated by *what* 
they are scoped to:
+
+.. list-table::
+   :header-rows: 1
+   :widths: 20 25 25 30
+
+   * - Store
+     - Scope
+     - Default lifetime
+     - Primary use case
+   * - **Task state**
+     - A single task Instance (dag_id + run_id + task_id + map_index)
+     - Configurable retention; cleared on task success when ``clear_on_success 
= True``
+     - Survive retries, track in-flight jobs, checkpoint progress within a run

Review Comment:
   Mention resume from progress made by past run?



##########
airflow-core/docs/core-concepts/task-and-asset-state.rst:
##########
@@ -0,0 +1,80 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:state-overview:
+
+Task and Asset State Overview
+========
+
+.. versionadded:: 3.3
+
+Airflow has always modeled tasks as stateless, idempotent units of work. A 
growing class of workloads, however, require a small amount of data to be 
persisted outside of a Task's return value, like a submitted job ID that must 
survive a worker crash, a watermark that advances run-by-run, or a row counter 
exposed for observability. Task state and Asset state fill that gap without 
touching the XCom or Variable systems.
+
+Task and Asset State
+--------------------
+
+Airflow 3.3 ships two persistent key/value stores, differentiated by *what* 
they are scoped to:
+
+.. list-table::
+   :header-rows: 1
+   :widths: 20 25 25 30
+
+   * - Store
+     - Scope
+     - Default lifetime
+     - Primary use case
+   * - **Task state**
+     - A single task Instance (dag_id + run_id + task_id + map_index)
+     - Configurable retention; cleared on task success when ``clear_on_success 
= True``
+     - Survive retries, track in-flight jobs, checkpoint progress within a run
+   * - **Asset state**
+     - An asset (independent of any particular run)
+     - Persists indefinitely; removed only when the asset is deactivated
+     - Cross-run watermarks, incremental-load cursors, per-asset metadata
+
+Both stores accept string keys and JSON values. Values up to 64 KB are 
supported through the default metastore backend; larger payloads can be 
offloaded via a :ref:`custom worker-side backend <state-store:worker-backends>`.
+
+When to use Task and Asset State
+--------------------------------
+
+Use this table to choose the right mechanism for your use case.
+
+.. list-table::
+   :header-rows: 1
+   :widths: 22 78
+
+   * - Mechanism
+     - When to use it
+   * - **XCom**
+     - Pass data *between tasks* within a single Dag run (e.g. the output of 
one task consumed by a downstream task). XComs are cleared on retry, and should 
NOT be used to persist state across task retries or across runs.

Review Comment:
   Across dag runs is possible, so we should not mention in a single dag run



##########
airflow-core/docs/core-concepts/task-and-asset-state.rst:
##########
@@ -0,0 +1,80 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:state-overview:
+
+Task and Asset State Overview
+========
+
+.. versionadded:: 3.3
+
+Airflow has always modeled tasks as stateless, idempotent units of work. A 
growing class of workloads, however, require a small amount of data to be 
persisted outside of a Task's return value, like a submitted job ID that must 
survive a worker crash, a watermark that advances run-by-run, or a row counter 
exposed for observability. Task state and Asset state fill that gap without 
touching the XCom or Variable systems.
+
+Task and Asset State
+--------------------
+
+Airflow 3.3 ships two persistent key/value stores, differentiated by *what* 
they are scoped to:
+
+.. list-table::
+   :header-rows: 1
+   :widths: 20 25 25 30
+
+   * - Store
+     - Scope
+     - Default lifetime
+     - Primary use case
+   * - **Task state**
+     - A single task Instance (dag_id + run_id + task_id + map_index)
+     - Configurable retention; cleared on task success when ``clear_on_success 
= True``
+     - Survive retries, track in-flight jobs, checkpoint progress within a run
+   * - **Asset state**
+     - An asset (independent of any particular run)
+     - Persists indefinitely; removed only when the asset is deactivated
+     - Cross-run watermarks, incremental-load cursors, per-asset metadata
+
+Both stores accept string keys and JSON values. Values up to 64 KB are 
supported through the default metastore backend; larger payloads can be 
offloaded via a :ref:`custom worker-side backend <state-store:worker-backends>`.
+
+When to use Task and Asset State
+--------------------------------
+
+Use this table to choose the right mechanism for your use case.
+
+.. list-table::
+   :header-rows: 1
+   :widths: 22 78
+
+   * - Mechanism
+     - When to use it
+   * - **XCom**
+     - Pass data *between tasks* within a single Dag run (e.g. the output of 
one task consumed by a downstream task). XComs are cleared on retry, and should 
NOT be used to persist state across task retries or across runs.
+   * - **Variables**
+     - Dag-wide or installation-wide configuration that changes infrequently 
and is set by operators rather than by tasks themselves.

Review Comment:
   ```suggestion
        - Deployment wide configuration store that changes infrequently and is 
set by deployment managers / users rather than by tasks themselves.
   ```



##########
airflow-core/docs/core-concepts/task-state.rst:
##########
@@ -0,0 +1,270 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:task-state:
+
+Task State
+==========
+
+.. versionadded:: 3.3
+
+Task state is a persistent key/value store scoped to a single task instance 
(``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker 
crashes and task retries within the same Dag run, making it suitable for 
storing external job IDs, intra-task checkpoints, and progress metadata.
+
+Task state is accessed through the task context via ``context["task_state"]`` 
and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``.
+
+
+Accessing task state
+--------------------
+
+Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, 
task state is available through the ``context`` dictionary via the 
``task_state`` key. From there, it can be used to retrieve, set, delete, or 
clear task state for a specific key-value pair. In this example, the ``job_id`` 
is retrieved from task state, then upated.
+
+.. code-block:: python
+    from airflow.sdk import task
+    import random
+
+    @task
+    def my_task(**context):
+        # Retrieve task_state from context
+        task_state = context["task_state"]
+        my_value = task_state.get("my_key")
+
+        # Set the new value
+        new_value = f"It is {random.randint(1, 12 + 1)} o'clock"
+        task_state.set("my_key", new_value)
+
+Reference
+-------------
+
+``get(key)``
+~~~~~~~~~~~~
+
+Returns the stored string value, or ``None`` if the key does not exist.
+
+.. code-block:: python
+
+    value = task_state.get("job_id")  # returns str or None
+
+``set(key, value, *, retention=None)``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Writes or overwrites a value for the specified key. Note, ``value`` can be any 
JSON-compatible type, including:
+
+* ``str``
+* ``int``
+* ``float``
+* ``bool``
+* ``list``
+* ``dict``
+
+The optional ``retention`` argument controls when the key expires:
+
+* ``timedelta(...)``: expire after the given duration from the time of the 
write (e.g. ``timedelta(hours=6)``).  The expiry timestamp is computed on the 
worker in UTC before the value is sent to the API server.
+* ``NEVER_EXPIRE``: the key never expires and is skipped by garbage 
collection, regardless of the global ``[state_store] default_retention_days`` 
setting.
+* ``None`` (default): fall back to the global ``[state_store] 
default_retention_days`` config.
+
+.. important::
+
+   ``retention`` accepts only a :class:`~datetime.timedelta`, not a plain 
integer number of days.  Passing an integer raises a ``TypeError``.
+
+   .. code-block:: python
+
+       # correct
+       task_state.set("key", "val", retention=timedelta(days=7))
+
+       # wrong — raises TypeError
+       task_state.set("key", "val", retention=7)
+
+``NEVER_EXPIRE`` sentinel
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Import ``NEVER_EXPIRE`` from ``airflow.sdk.execution_time.context``:
+
+.. code-block:: python
+
+    from airflow.sdk.execution_time.context import NEVER_EXPIRE

Review Comment:
   ```suggestion
       from airflow.sdk import NEVER_EXPIRE
   ```



##########
airflow-core/docs/core-concepts/task-state.rst:
##########
@@ -0,0 +1,270 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:task-state:
+
+Task State
+==========
+
+.. versionadded:: 3.3
+
+Task state is a persistent key/value store scoped to a single task instance 
(``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker 
crashes and task retries within the same Dag run, making it suitable for 
storing external job IDs, intra-task checkpoints, and progress metadata.
+
+Task state is accessed through the task context via ``context["task_state"]`` 
and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``.
+
+
+Accessing task state
+--------------------
+
+Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, 
task state is available through the ``context`` dictionary via the 
``task_state`` key. From there, it can be used to retrieve, set, delete, or 
clear task state for a specific key-value pair. In this example, the ``job_id`` 
is retrieved from task state, then upated.
+
+.. code-block:: python
+    from airflow.sdk import task
+    import random
+
+    @task
+    def my_task(**context):
+        # Retrieve task_state from context
+        task_state = context["task_state"]
+        my_value = task_state.get("my_key")
+
+        # Set the new value
+        new_value = f"It is {random.randint(1, 12 + 1)} o'clock"
+        task_state.set("my_key", new_value)
+
+Reference
+-------------
+
+``get(key)``
+~~~~~~~~~~~~
+
+Returns the stored string value, or ``None`` if the key does not exist.
+
+.. code-block:: python
+
+    value = task_state.get("job_id")  # returns str or None
+
+``set(key, value, *, retention=None)``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Writes or overwrites a value for the specified key. Note, ``value`` can be any 
JSON-compatible type, including:
+
+* ``str``
+* ``int``
+* ``float``
+* ``bool``
+* ``list``
+* ``dict``
+
+The optional ``retention`` argument controls when the key expires:
+
+* ``timedelta(...)``: expire after the given duration from the time of the 
write (e.g. ``timedelta(hours=6)``).  The expiry timestamp is computed on the 
worker in UTC before the value is sent to the API server.
+* ``NEVER_EXPIRE``: the key never expires and is skipped by garbage 
collection, regardless of the global ``[state_store] default_retention_days`` 
setting.
+* ``None`` (default): fall back to the global ``[state_store] 
default_retention_days`` config.
+
+.. important::
+
+   ``retention`` accepts only a :class:`~datetime.timedelta`, not a plain 
integer number of days.  Passing an integer raises a ``TypeError``.
+
+   .. code-block:: python
+
+       # correct
+       task_state.set("key", "val", retention=timedelta(days=7))
+
+       # wrong — raises TypeError
+       task_state.set("key", "val", retention=7)
+
+``NEVER_EXPIRE`` sentinel
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Import ``NEVER_EXPIRE`` from ``airflow.sdk.execution_time.context``:
+
+.. code-block:: python
+
+    from airflow.sdk.execution_time.context import NEVER_EXPIRE
+
+    task_state.set("job_id", job_id, retention=NEVER_EXPIRE)
+
+``delete(key)``
+~~~~~~~~~~~~~~~
+
+Deletes a single key.  No-op if the key does not exist.
+
+.. code-block:: python
+
+    task_state.delete("job_id")
+
+``clear(all_map_indices=False)``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Deletes *all* state keys for this task instance.
+
+For :doc:`mapped tasks <dynamic-task-mapping>`, the default clears only the 
current map index.  Pass ``all_map_indices=True`` to wipe state across 
**every** mapped instance of the task (fleet-wide reset).
+
+.. code-block:: python
+
+    # clear only this map index
+    task_state.clear()
+
+    # clear all map indices (fleet-wide)
+    task_state.clear(all_map_indices=True)
+
+
+Use Cases
+---------
+
+External job resumption
+~~~~~~~~~~~~~~~~~~~~~~~
+
+A common pattern for long-running external jobs: check whether a job ID is 
already stored before submitting, and use ``NEVER_EXPIRE`` so the key outlives
+the default retention window.
+
+.. code-block:: python
+
+    from datetime import timedelta
+
+    from airflow.sdk import DAG, task
+    from airflow.sdk.execution_time.context import NEVER_EXPIRE

Review Comment:
   ```suggestion
       from airflow.sdk import NEVER_EXPIRE
   ```



##########
airflow-core/docs/core-concepts/asset-state.rst:
##########
@@ -0,0 +1,229 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:asset-state:
+
+Asset State
+===========
+
+.. versionadded:: 3.3
+
+Asset state is a persistent key/value store scoped to an *asset*, independent 
of any particular DAG run.  Unlike :doc:`task state 
</core-concepts/task-state>`, which is tied to a single task instance, asset 
state persists across runs and is logically owned by the asset itself.  It is 
the natural home for cross-run metadata such as watermarks, incremental-load 
cursors, and per-asset configuration.
+
+Asset state is accessed through the task context via 
``context["asset_state"]``.
+
+
+When is ``asset_state`` available?
+------------------------------------
+
+When using asset state within a task, ``context["asset_state"]`` is populated 
for **concrete** :class:`~airflow.sdk.definitions.asset.Asset` inlets and 
outlets. A task must declare at least one concrete inlet or outlet for 
``asset_state`` to contain any entries.
+
+If using asset state in a ``BaseEventTrigger``, the ``self.asset_state`` 
parameter can be used within the ``BaseEventTrigger``. It can be subscripted in 
the same way that ``context["asset_state"]`` can be.
+
+.. warning::
+
+   **Outlets-only tasks**: if a task declares only ``outlets`` (no 
``inlets``), ``context["asset_state"][my_asset]`` may raise a ``KeyError`` at 
runtime.  The workaround is to declare the asset in **both** ``inlets`` and 
``outlets``.
+
+   .. code-block:: python
+       # my_asset defined above ...
+
+       @task(inlets=[my_asset], outlets=[my_asset])
+       def write_asset(**context):
+           context["asset_state"][my_asset].set("watermark", "2024-01-01")
+
+   This known issue will be resolved in a future release.
+
+
+Accessing asset state using ``context``
+---------------------------------------
+
+An asset can be brought into "scope" (for lack of a better phrase) by 
including it in ``inlets`` (or both ``inlets`` and ``outlets``). Then subscript 
``context["asset_state"]`` with the asset object to retrieve the asset state.
+
+.. code-block:: python
+
+    from airflow.sdk import Asset, DAG, task
+
+    my_asset = Asset("my_data", uri="s3://bucket/my_data")
+
+    with DAG("example_asset_state", schedule=None):
+
+        @task(inlets=[my_asset], outlets=[my_asset])
+        def process(**context):
+            asset_state = context["asset_state"][my_asset]
+            watermark = asset_state.get("watermark")
+            asset_state.set("watermark", "2024-06-01")
+
+To see asset state in-action in a real DAG, checkout the DAG in 
`example_asset_state.py 
<https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/example_dags/example_asset_state.py>`_.
+
+Accessing asset state in a ``BaseEventTrigger``
+-----------------------------------------------
+
+When building Triggers used for asset "watching", asset state can be retrieved 
using the ``self.asset_state`` attribute.
+
+.. code-block:: python
+    from airflow.sdk import Asset, BaseEventTrigger, TriggerEvent
+    from collections.abc import AsyncIterator
+
+    class GenericEventTrigger(BaseEventTrigger):
+        ...
+
+        async def run(self) -> AsyncIterator[TriggerEvent]:
+            """Logic that fires a TriggerEvent."""
+            my_data = Asset(name="my_data")
+            asset_state = self.asset_state[my_data]
+            watermark = asset_state.get("watermark")
+            asset_state.set("watermark", "2024-06-01")
+
+In the example above, ``my_data`` is created using the ``name`` However, the 
``uri`` can also be used:
+
+.. code-block:: python
+    from airflow.sdk import Asset, BaseEventTrigger, TriggerEvent
+    from collections.abc import AsyncIterator
+
+    class GenericEventTrigger(BaseEventTrigger):
+        ...
+
+        async def run(self) -> AsyncIterator[TriggerEvent]:
+            """Logic that fires a TriggerEvent."""
+            my_data = Asset(uri="s3://bucket/my_data")
+            asset_state = self.asset_state[my_data]
+            watermark = asset_state.get("watermark")
+            asset_state.set("watermark", "2024-06-01")
+
+Single-inlet shorthand
+~~~~~~~~~~~~~~~~~~~~~~~
+
+For tasks with exactly **one** concrete inlet, you can call ``get``, ``set``, 
``delete``, and ``clear`` directly on ``context["asset_state"]`` without 
subscripting.
+
+.. code-block:: python
+
+    @task(inlets=[my_asset], outlets=[my_asset])
+    def process_single(**context):
+        asset_state = context["asset_state"]
+        watermark = asset_state.get("watermark")
+        asset_state.set("watermark", "2024-06-01")
+
+If the task has more than one concrete inlet, calling the shorthand raises a 
``ValueError``.  Use the subscript form (``context["asset_state"][my_asset]``) 
whenever a task has multiple inlets.
+
+
+API reference
+-------------
+
+The following methods are available on both the per-asset accessor 
(``context["asset_state"][my_asset]``), the shorthand 
(``context["asset_state"]``) when the task has exactly one inlet, and when 
using the ``self.asset_state`` attribute.
+
+``get(key)``
+~~~~~~~~~~~~
+
+Returns the stored JSON value, or ``None`` if the key does not exist.
+
+.. code-block:: python
+
+    # Using context
+    watermark = context["asset_state"][my_asset].get("watermark")
+
+    # Using self.asset_state
+    watermark = self.asset_state[my_asset].get("watermark")
+
+``set(key, value)``
+~~~~~~~~~~~~~~~~~~~
+
+Writes or overwrites a key-value pair. Unlike Task state, asset state has no 
``retention`` parameter. Values persist until explicitly deleted or until the 
asset is deactivated.
+
+.. code-block:: python
+
+    # Using context
+    context["asset_state"][my_asset].set("watermark", "2024-06-01T00:00:00Z")
+
+    # Using self.asset_state
+    self.asset_state[my_asset].set("watermark", "2024-06-01T00:00:00Z")

Review Comment:
   Worth mentioning that None cannot be set or the types that can be set



##########
airflow-core/docs/administration-and-deployment/state-cleanup.rst:
##########
@@ -0,0 +1,109 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _state-cleanup:
+
+State Cleanup
+=============
+
+.. versionadded:: 3.3
+
+Airflow does not automatically purge task state rows on a schedule. Cleanup 
(also known as "garbage collection") is the responsibility of the user (you) 
and must be triggered explicitly via the CLI. This page explains what gets 
cleaned up, how to run it, and how to integrate it into a recurring maintenance 
workflow.
+
+
+What gets cleaned up
+--------------------
+
+The cleanup command operates only on **task state** rows in the 
``MetastoreStateBackend``. Asset state rows are never touched by this command. 
Asset state rows are removed only by the orphan sweep when an asset is 
deactivated (see :ref:`state-store`).
+
+A task state row is eligible for deletion when its ``expires_at`` timestamp is 
in the past. ``expires_at`` is computed on the worker at write time:
+
+* Keys written with an explicit ``retention=timedelta(...)`` expire after that 
duration from the time of the write.
+* Keys written with ``retention=None`` (the default) pick up an expiry based 
on ``[state_store] default_retention_days``. If that value is ``> 0``, the key 
expires that many days after the write.

Review Comment:
   Or rather we shouldn't at all allow it



##########
airflow-core/docs/core-concepts/task-state.rst:
##########
@@ -0,0 +1,270 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:task-state:
+
+Task State
+==========
+
+.. versionadded:: 3.3
+
+Task state is a persistent key/value store scoped to a single task instance 
(``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker 
crashes and task retries within the same Dag run, making it suitable for 
storing external job IDs, intra-task checkpoints, and progress metadata.
+
+Task state is accessed through the task context via ``context["task_state"]`` 
and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``.
+
+
+Accessing task state
+--------------------
+
+Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, 
task state is available through the ``context`` dictionary via the 
``task_state`` key. From there, it can be used to retrieve, set, delete, or 
clear task state for a specific key-value pair. In this example, the ``job_id`` 
is retrieved from task state, then upated.
+
+.. code-block:: python
+    from airflow.sdk import task
+    import random
+
+    @task
+    def my_task(**context):
+        # Retrieve task_state from context
+        task_state = context["task_state"]
+        my_value = task_state.get("my_key")
+
+        # Set the new value
+        new_value = f"It is {random.randint(1, 12 + 1)} o'clock"
+        task_state.set("my_key", new_value)
+
+Reference
+-------------
+
+``get(key)``
+~~~~~~~~~~~~
+
+Returns the stored string value, or ``None`` if the key does not exist.
+
+.. code-block:: python
+
+    value = task_state.get("job_id")  # returns str or None
+
+``set(key, value, *, retention=None)``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Writes or overwrites a value for the specified key. Note, ``value`` can be any 
JSON-compatible type, including:
+
+* ``str``
+* ``int``
+* ``float``
+* ``bool``
+* ``list``
+* ``dict``
+
+The optional ``retention`` argument controls when the key expires:
+
+* ``timedelta(...)``: expire after the given duration from the time of the 
write (e.g. ``timedelta(hours=6)``).  The expiry timestamp is computed on the 
worker in UTC before the value is sent to the API server.
+* ``NEVER_EXPIRE``: the key never expires and is skipped by garbage 
collection, regardless of the global ``[state_store] default_retention_days`` 
setting.
+* ``None`` (default): fall back to the global ``[state_store] 
default_retention_days`` config.
+
+.. important::
+
+   ``retention`` accepts only a :class:`~datetime.timedelta`, not a plain 
integer number of days.  Passing an integer raises a ``TypeError``.
+
+   .. code-block:: python
+
+       # correct
+       task_state.set("key", "val", retention=timedelta(days=7))
+
+       # wrong — raises TypeError
+       task_state.set("key", "val", retention=7)
+
+``NEVER_EXPIRE`` sentinel
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Import ``NEVER_EXPIRE`` from ``airflow.sdk.execution_time.context``:
+
+.. code-block:: python
+
+    from airflow.sdk.execution_time.context import NEVER_EXPIRE
+
+    task_state.set("job_id", job_id, retention=NEVER_EXPIRE)
+
+``delete(key)``
+~~~~~~~~~~~~~~~
+
+Deletes a single key.  No-op if the key does not exist.
+
+.. code-block:: python
+
+    task_state.delete("job_id")
+
+``clear(all_map_indices=False)``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Deletes *all* state keys for this task instance.
+
+For :doc:`mapped tasks <dynamic-task-mapping>`, the default clears only the 
current map index.  Pass ``all_map_indices=True`` to wipe state across 
**every** mapped instance of the task (fleet-wide reset).
+
+.. code-block:: python
+
+    # clear only this map index
+    task_state.clear()
+
+    # clear all map indices (fleet-wide)
+    task_state.clear(all_map_indices=True)
+
+
+Use Cases
+---------
+
+External job resumption
+~~~~~~~~~~~~~~~~~~~~~~~
+
+A common pattern for long-running external jobs: check whether a job ID is 
already stored before submitting, and use ``NEVER_EXPIRE`` so the key outlives
+the default retention window.
+
+.. code-block:: python
+
+    from datetime import timedelta
+
+    from airflow.sdk import DAG, task
+    from airflow.sdk.execution_time.context import NEVER_EXPIRE
+
+
+    with DAG("spark_job_dag", schedule=None):
+
+        @task
+        def run_spark_job(**context):
+            task_state = context["task_state"]
+
+            # Check for an already-submitted job from a previous attempt.
+            job_id = task_state.get("job_id")
+            if job_id is None:
+                job_id = spark_client.submit_job(...)
+                # Store with NEVER_EXPIRE so the key is not garbage-collected 
before the job finishes
+                task_state.set("job_id", str(job_id), retention=NEVER_EXPIRE)
+
+            # Reattach to the job and wait for completion.
+            result = spark_client.wait_for_completion(job_id)
+            return result
+
+On a retry, the task finds the stored ``job_id`` and reattaches instead of 
submitting a duplicate job. Another example of this sort of logic can be found 
in `example_task_state.py 
<https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/example_dags/example_task_state.py>`_.
+
+Intra-task checkpointing
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+For tasks that process paginated or batched data, store the last-completed 
offset so a retry can resume mid-stream rather than restarting from the 
beginning.
+
+.. code-block:: python
+
+    from airflow.sdk import DAG, task
+
+
+    with DAG("paginated_ingest", schedule="@daily"):
+
+        @task
+        def ingest_pages(**context):
+            # Retrieve the task_state
+            task_state = context["task_state"]
+            raw = task_state.get("last_page")
+
+            start_page = raw + 1 if raw is not None else 1
+
+            for page in range(start_page, total_pages + 1):
+                fetch_and_load(page)
+                task_state.set("last_page", page)  # Update the task_state for 
reuse later
+
+
+On a retry, the task reads ``last_page`` and skips pages that were already 
processed.
+
+Progress metadata
+~~~~~~~~~~~~~~~~~
+
+Task state can expose in-progress metrics for observability — row counts, 
status strings, or lightweight JSON payloads — without requiring XCom or an 
external system.
+
+.. code-block:: python
+
+    import json
+
+    from airflow.sdk import DAG, task
+
+
+    with DAG("row_ingest", schedule="@hourly"):
+
+        @task
+        def ingest_rows(**context):
+            task_state = context["task_state"]
+            total = 0
+
+            for batch in get_batches():
+                load(batch)
+                total += len(batch)
+                task_state.set(
+                    "progress",
+                    {
+                        "rows_loaded": total,
+                        "status": "running"
+                    },
+                )
+
+            task_state.set(
+                "progress",
+                json.dumps({

Review Comment:
   ```suggestion
                   {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to