kaxil commented on code in PR #67299: URL: https://github.com/apache/airflow/pull/67299#discussion_r3366363389
########## airflow-core/docs/administration-and-deployment/task-and-asset-store.rst: ########## @@ -0,0 +1,210 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _task-and-asset-store: + +Task and Asset Store Configuration +==================================== + +.. versionadded:: 3.3 + +The task and asset store is the persistence layer for :doc:`task store </core-concepts/task-store>` and :doc:`asset store </core-concepts/asset-store>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. + +Configuration reference +----------------------- + +All options live under the ``[state_store]`` section of ``airflow.cfg``. + +.. note:: + + The config section is ``[state_store]``, **not** ``[task_store]``. + +``backend`` +~~~~~~~~~~~ + +Full dotted path to a class that implements :class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in metastore backend. Review Comment: The class is `BaseStoreBackend`, not `BaseStateBackend`. It is exported as `airflow.sdk.state.BaseStoreBackend` ([state.py](https://github.com/apache/airflow/blob/d88504564c506e7edfbd64cb0589f6ac9d6263c5/task-sdk/src/airflow/sdk/state.py#L21-L25)) and the backend resolver checks `issubclass(clazz, BaseStoreBackend)` ([state/__init__.py L36-L38](https://github.com/apache/airflow/blob/d88504564c506e7edfbd64cb0589f6ac9d6263c5/airflow-core/src/airflow/state/__init__.py#L36-L38)). `BaseStateBackend` does not exist, so `from airflow.sdk.state import BaseStateBackend` (lines 131 and 173) raises ImportError and the `:class:` xrefs will not resolve. Needs fixing at lines 39, 125, 131, 134, 156, 179. The metastore class is also `MetastoreStoreBackend`, not `MetastoreStateBackend` (lines 118 and 209). ########## airflow-core/docs/core-concepts/asset-store.rst: ########## @@ -0,0 +1,189 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:asset-store: + +.. spelling:word-list:: + + subscripted + subscripting + +Asset Store +=========== + +.. versionadded:: 3.3 + +Asset store is a persistent key/value store scoped to an *asset*, independent of any particular DAG run. Unlike :doc:`task store </core-concepts/task-store>`, which is tied to a single task instance, asset store persists across runs and is logically owned by the asset itself. It is the natural home for cross-run metadata such as watermarks, incremental-load cursors, and per-asset configuration. + +Asset store is accessed through the task context via ``context["asset_store"]``. + + +When is ``asset_store`` available? +---------------------------------- + +When using asset store within a task, ``context["asset_store"]`` is populated for **concrete** :class:`~airflow.sdk.definitions.asset.Asset` inlets and outlets. A task must declare at least one concrete inlet or outlet for ``asset_store`` to contain any entries. + +Accessing asset store using ``context`` +--------------------------------------- + +An asset becomes available through context["asset_store"] when it is included in inlets (or in both inlets and outlets). You can then retrieve its asset store by subscripting context["asset_store"] with the asset object. + +.. code-block:: python + + from airflow.sdk import Asset, DAG, task + + my_asset = Asset("my_data", uri="s3://bucket/my_data") + + with DAG("example_asset_store", schedule=None): + + @task(inlets=[my_asset], outlets=[my_asset]) + def process(**context): + asset_store = context["asset_store"][my_asset] + watermark = asset_store.get("watermark") + asset_store.set("watermark", "2024-06-01") + +To see asset store in-action in a real DAG, checkout the DAG in `example_asset_store.py <https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/example_dags/example_asset_store.py>`_. + +Single-inlet shorthand +~~~~~~~~~~~~~~~~~~~~~~~ + +For tasks with exactly **one** concrete inlet, you can call ``get``, ``set``, ``delete``, and ``clear`` directly on ``context["asset_store"]`` without subscripting. + +.. code-block:: python + + @task(inlets=[my_asset], outlets=[my_asset]) + def process_single(**context): + asset_store = context["asset_store"] + watermark = asset_store.get("watermark") + asset_store.set("watermark", "2024-06-01") + +If the task has more than one concrete inlet, calling the shorthand raises a ``ValueError``. Use the subscript form (``context["asset_store"][my_asset]``) whenever a task has multiple inlets. + + +API reference +------------- + +The following methods are available on both the per-asset accessor (``context["asset_store"][my_asset]``), the shorthand (``context["asset_store"]``) when the task has exactly one inlet, and when using the ``self.asset_store`` attribute. Review Comment: There is no `self.asset_store` attribute. Asset store is reached only through the task context (`context["asset_store"]`, optionally subscripted by the asset), implemented by `AssetStoreAccessors` ([context.py L774-L849](https://github.com/apache/airflow/blob/d88504564c506e7edfbd64cb0589f6ac9d6263c5/task-sdk/src/airflow/sdk/execution_time/context.py#L774-L849)); there is no operator-level `self` accessor. Every `self.asset_store[...]` example in this section (lines 93, 113, 126, 139) raises `AttributeError`. Suggest dropping the `self.asset_store` column and keeping `context["asset_store"]` as the single documented path. ########## airflow-core/docs/core-concepts/asset-store.rst: ########## @@ -0,0 +1,189 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:asset-store: + +.. spelling:word-list:: + + subscripted + subscripting + +Asset Store +=========== + +.. versionadded:: 3.3 + +Asset store is a persistent key/value store scoped to an *asset*, independent of any particular DAG run. Unlike :doc:`task store </core-concepts/task-store>`, which is tied to a single task instance, asset store persists across runs and is logically owned by the asset itself. It is the natural home for cross-run metadata such as watermarks, incremental-load cursors, and per-asset configuration. + +Asset store is accessed through the task context via ``context["asset_store"]``. + + +When is ``asset_store`` available? +---------------------------------- + +When using asset store within a task, ``context["asset_store"]`` is populated for **concrete** :class:`~airflow.sdk.definitions.asset.Asset` inlets and outlets. A task must declare at least one concrete inlet or outlet for ``asset_store`` to contain any entries. + +Accessing asset store using ``context`` +--------------------------------------- + +An asset becomes available through context["asset_store"] when it is included in inlets (or in both inlets and outlets). You can then retrieve its asset store by subscripting context["asset_store"] with the asset object. + +.. code-block:: python + + from airflow.sdk import Asset, DAG, task + + my_asset = Asset("my_data", uri="s3://bucket/my_data") + + with DAG("example_asset_store", schedule=None): + + @task(inlets=[my_asset], outlets=[my_asset]) + def process(**context): + asset_store = context["asset_store"][my_asset] + watermark = asset_store.get("watermark") + asset_store.set("watermark", "2024-06-01") + +To see asset store in-action in a real DAG, checkout the DAG in `example_asset_store.py <https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/example_dags/example_asset_store.py>`_. + +Single-inlet shorthand +~~~~~~~~~~~~~~~~~~~~~~~ + +For tasks with exactly **one** concrete inlet, you can call ``get``, ``set``, ``delete``, and ``clear`` directly on ``context["asset_store"]`` without subscripting. + +.. code-block:: python + + @task(inlets=[my_asset], outlets=[my_asset]) + def process_single(**context): + asset_store = context["asset_store"] + watermark = asset_store.get("watermark") + asset_store.set("watermark", "2024-06-01") + +If the task has more than one concrete inlet, calling the shorthand raises a ``ValueError``. Use the subscript form (``context["asset_store"][my_asset]``) whenever a task has multiple inlets. + + +API reference +------------- + +The following methods are available on both the per-asset accessor (``context["asset_store"][my_asset]``), the shorthand (``context["asset_store"]``) when the task has exactly one inlet, and when using the ``self.asset_store`` attribute. + +``get(key, default)`` +~~~~~~~~~~~~~~~~~~~~~ + +Returns the stored JSON value, or the ``default`` value if the key does not exist. + +.. code-block:: python + + # Using context + watermark = context["asset_store"][my_asset].get("watermark", default="initial_watermark") + + # Using self.asset_store + watermark = self.asset_store[my_asset].get("watermark") + +``set(key, value)`` +~~~~~~~~~~~~~~~~~~~ + +Writes or overwrites a key-value pair. Unlike task store, asset store has no ``retention`` parameter. Values persist until explicitly deleted or until the asset is deactivated. Like with task store, ``value`` can be any JSON-compatible type, except for ``None``. This includes: + +* ``str`` +* ``int`` +* ``float`` +* ``bool`` +* ``list`` +* ``dict`` + +.. code-block:: python + + # Using context + context["asset_store"][my_asset].set("watermark", default="2024-06-01T00:00:00Z") Review Comment: `set()` takes the value positionally and has no `default` parameter; the signature is `set(key, value)` ([context.py L835](https://github.com/apache/airflow/blob/d88504564c506e7edfbd64cb0589f6ac9d6263c5/task-sdk/src/airflow/sdk/execution_time/context.py#L835)). `set("watermark", default="...")` raises `TypeError`. It should be `set("watermark", "2024-06-01T00:00:00Z")`. Same on line 113. ########## airflow-core/docs/core-concepts/task-store.rst: ########## @@ -0,0 +1,278 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:task-store: + +.. spelling:word-list:: + + intra + Intra + checkpointing + +Task Store +========== + +.. versionadded:: 3.3 + +Task store is a persistent key/value store scoped to a single task instance (``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker crashes and task retries within the same Dag run, making it suitable for storing external job IDs, intra-task checkpoints, and progress metadata. + +Data persisted via task store is accessed through the task context via ``context["task_store"]`` and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``. + + +Accessing task store +-------------------- + +Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, task store is available through the ``context`` dictionary via the ``task_store`` key. From there, it can be used to retrieve, set, delete, or clear data for a specific key-value pair. In this example, the ``job_id`` is retrieved from task store, then updated, before being deleted. All data for that task is then removed using the ``clear`` method. + +.. code-block:: python + + from airflow.sdk import task + import random + + + @task + def my_task(**context): + # Retrieve task_store from context + task_store = context["task_store"] + my_value = task_store.get("my_key", default="my_default_key") + + # Set the new value + new_value = f"It is {random.randint(1, 12 + 1)} o'clock" + task_store.set("my_key", new_value) + + # Delete the value + task_store.delete("my_key") + + # Clear all store entries for the task + task_store.clear() + +Reference +--------- + +``get(key, default)`` +~~~~~~~~~~~~~~~~~~~~~ + +Returns the stored JSON value, or the ``default`` value if the key does not exist. + +.. code-block:: python + + value = task_store.get( + "job_id", default="123456789" + ) # returns the value associated with `job_id` or the default value + +``set(key, value, *, retention=None)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Writes or overwrites a value for the specified key. Note, ``value`` can be any JSON-compatible type, except for ``None``. This includes: + +* ``str`` +* ``int`` +* ``float`` +* ``bool`` +* ``list`` +* ``dict`` + +The optional ``retention`` argument controls when the key expires: + +* ``timedelta(...)``: expire after the given duration from the time of the write (e.g. ``timedelta(hours=6)``). The expiry timestamp is computed on the worker before the value is sent to the API server. +* ``NEVER_EXPIRE``: the key never expires and is skipped during garbage collection, regardless of the global ``[state_store] default_retention_days`` setting. +* ``None`` (default): fall back to the global ``[state_store] default_retention_days`` config. + +.. important:: + + ``retention`` accepts only a :class:`~datetime.timedelta`, not a plain integer number of days. Passing an integer raises a ``TypeError``. + + .. code-block:: python + + # correct + task_store.set("key", "val", retention=timedelta(days=7)) + + # wrong — raises TypeError + task_store.set("key", "val", retention=7) + +``NEVER_EXPIRE`` sentinel +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Import ``NEVER_EXPIRE`` from ``airflow.sdk.execution_time.context``: Review Comment: The prose says to import `NEVER_EXPIRE` from `airflow.sdk.execution_time.context`, but the code block just below uses `from airflow.sdk import NEVER_EXPIRE`. Both resolve (it is defined in `execution_time.context` and re-exported via `airflow.sdk`), but `execution_time.context` is the internal module and `airflow.sdk` is the public export. Recommend the prose point at `airflow.sdk` to match the code and the other examples. ########## airflow-core/docs/core-concepts/asset-store.rst: ########## @@ -0,0 +1,189 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:asset-store: + +.. spelling:word-list:: + + subscripted + subscripting + +Asset Store +=========== + +.. versionadded:: 3.3 + +Asset store is a persistent key/value store scoped to an *asset*, independent of any particular DAG run. Unlike :doc:`task store </core-concepts/task-store>`, which is tied to a single task instance, asset store persists across runs and is logically owned by the asset itself. It is the natural home for cross-run metadata such as watermarks, incremental-load cursors, and per-asset configuration. + +Asset store is accessed through the task context via ``context["asset_store"]``. + + +When is ``asset_store`` available? +---------------------------------- + +When using asset store within a task, ``context["asset_store"]`` is populated for **concrete** :class:`~airflow.sdk.definitions.asset.Asset` inlets and outlets. A task must declare at least one concrete inlet or outlet for ``asset_store`` to contain any entries. + +Accessing asset store using ``context`` +--------------------------------------- + +An asset becomes available through context["asset_store"] when it is included in inlets (or in both inlets and outlets). You can then retrieve its asset store by subscripting context["asset_store"] with the asset object. Review Comment: This availability rule undercounts outlets. In `AssetStoreAccessors.__init__` the outlet loop adds an accessor for each `Asset`/`AssetNameRef`/`AssetUriRef` outlet, and `_total = len(by_name) + len(by_uri)` counts inlets and outlets together ([context.py L803-L825](https://github.com/apache/airflow/blob/d88504564c506e7edfbd64cb0589f6ac9d6263c5/task-sdk/src/airflow/sdk/execution_time/context.py#L803-L825)); the shorthand raises only when `_total != 1` ("Task has N concrete inlets and outlets"). So this should read "inlets or outlets", line 64 should be "exactly one concrete inlet or outlet", and line 74 likewise. It matters for the headline case: a watermark writer naturally declares the asset as an outlet only, and as written the docs tell that user the store is unavailable. ########## airflow-core/docs/administration-and-deployment/task-and-asset-store.rst: ########## @@ -0,0 +1,210 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _task-and-asset-store: + +Task and Asset Store Configuration +==================================== + +.. versionadded:: 3.3 + +The task and asset store is the persistence layer for :doc:`task store </core-concepts/task-store>` and :doc:`asset store </core-concepts/asset-store>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. + +Configuration reference +----------------------- + +All options live under the ``[state_store]`` section of ``airflow.cfg``. + +.. note:: + + The config section is ``[state_store]``, **not** ``[task_store]``. + +``backend`` +~~~~~~~~~~~ + +Full dotted path to a class that implements :class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in metastore backend. + +.. code-block:: ini + + [state_store] + backend = mypackage.state.CustomStateBackend + +``default_retention_days`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of days after which task store rows expire. When a key is written with no explicit retention, expires_at is computed on the worker as now + default_retention_days. Changing this setting does not affect already-written rows. + +* Set to ``0`` to disable time-based cleanup entirely. +* Default: ``30``. +* This setting does **not** apply to asset store rows. + +.. code-block:: ini + + [state_store] + default_retention_days = 30 + +``clear_on_success`` +~~~~~~~~~~~~~~~~~~~~ + +When ``True``, all task store keys for a task instance are automatically deleted when that task instance moves to the ``success`` state. Defaults to ``False``, which preserves task store entries after success for observability (e.g. the submitted job ID or the last row count is still readable from the UI or REST API after the run completes). + +.. important:: + + ``clear_on_success`` clears **task store only**. It has no effect on asset store. Asset store is scoped to the asset rather than the task instance and must be cleared explicitly. + +.. code-block:: ini + + [state_store] + clear_on_success = False + +``state_cleanup_batch_size`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of rows deleted per batch during garbage collection cleanup. Set to ``0`` (default) to delete all matching rows in a single statement. Tune this on deployments with large ``task_store`` tables to reduce lock contention. + +.. code-block:: ini + + [state_store] + state_cleanup_batch_size = 10000 + +.. _task-and-asset-store:worker-backends: + +Worker-side backend (``[workers] state_backend``) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +A separate, optional config key under ``[workers]`` lets you route task store and asset store values through a worker-side backend before they reach the API server. + +.. code-block:: ini + + [workers] + state_backend = mypackage.state.S3StateBackend + +When this is set, ``TaskStoreAccessor.set()`` calls ``serialize_task_store_to_ref()`` on the worker-side backend before sending the returned value (a reference to the actual storage) to the Execution API, and ``get()`` calls ``deserialize_task_store_from_ref()`` after receiving the stored reference from the Execution API. See `Custom worker-side backends`_ below. + + +Garbage collection semantics +----------------------------- + +The cleanup task, also known as "garbage collection" is triggered using the Airflow CLI. The command to trigger the cleanup task is ``airflow state-store cleanup-task-store``. This process removes store rows according to the following rules: + +**Time-based expiry (task store only)** + Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on the *worker* at write time, not by the server. + +**``default_retention_days`` fallback (task store only)** + Keys written with no explicit retention get an ``expires_at`` of now + default_retention_days computed at write time. Garbage collection deletes rows where ``expires_at < now()``." + +**``NEVER_EXPIRE`` keys** + Keys set with ``retention=NEVER_EXPIRE`` are stored with ``expires_at = NULL`` and a flag that tells the garbage collection to skip them unconditionally. They are never deleted by time-based cleanup, regardless of ``default_retention_days``. + +**Orphan sweep (asset store)** Review Comment: I cannot find an orphan-sweep implementation. `MetastoreStoreBackend.cleanup()` deletes `task_store` rows only (`WHERE expires_at < now()`, [metastore.py L376-L404](https://github.com/apache/airflow/blob/d88504564c506e7edfbd64cb0589f6ac9d6263c5/airflow-core/src/airflow/state/metastore.py#L376-L404)); the `state-store cleanup-task-store` CLI just calls it, and nothing in the scheduler sweeps asset store. Asset store rows are removed only by the FK `ondelete=CASCADE` ([asset_store.py L56-L64](https://github.com/apache/airflow/blob/d88504564c506e7edfbd64cb0589f6ac9d6263c5/airflow-core/src/airflow/models/asset_store.py#L56-L64)) when the parent `asset` row is hard-deleted, which is not the same as "no `asset_active` record" (a deactivated asset keeps its `asset` row). So as written, deactivated or renamed assets keep their store rows indefinitely. Either the sweep needs implementing, or this bullet (and asset-store.rst's "removed during the next garbage-collection pass") should descri be the cascade-on-delete behavior instead. ########## airflow-core/docs/core-concepts/task-and-asset-store.rst: ########## @@ -0,0 +1,80 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:task-and-asset-store-overview: + +Task and Asset Store Overview +============================= + +.. versionadded:: 3.3 + +Airflow has always modeled tasks as stateless, idempotent units of work. A growing class of workloads, however, require some amount of data to be persisted outside of a task's return value, like a submitted job ID that must survive a worker crash, a watermark that advances run-by-run, or a row counter exposed for observability. Task store and Asset store fill that gap without touching the XCom or Variable systems. + +Task and Asset Store +-------------------- + +Task and Asset store provide two key/value stores to persist data like a job ID, watermark, or row count. These two stores are differentiated by *what* they are scoped to: + +.. list-table:: + :header-rows: 1 + :widths: 20 25 25 30 + + * - Store + - Scope + - Default lifetime + - Primary use case + * - **Task store** + - A single task Instance (dag_id + run_id + task_id + map_index) + - Configurable retention; cleared on task success when ``clear_on_success = True`` + - Survive retries, track in-flight jobs, checkpoint progress within a run, resume progress from checkpoint set by a past run + * - **Asset store** + - An asset (independent of any particular run) + - Persists indefinitely; removed only when the asset is deactivated + - Cross-run watermarks, incremental-load cursors, per-asset metadata + +Both stores accept JSON-able values. Values up to 64 KB are supported through the default metastore backend; larger payloads can be offloaded via a :ref:`custom worker-side backend <task-and-asset-store:worker-backends>`. Review Comment: I cannot find where the 64 KB limit is enforced. `value` is `Text().with_variant(MEDIUMTEXT, "mysql")` (~1 GB on Postgres, 16 MB on MySQL), and the only `value` validator on the write path checks JSON-representability and rejects null/NaN/Inf, not size ([execution_api/datamodels/task_store.py L40-L46](https://github.com/apache/airflow/blob/d88504564c506e7edfbd64cb0589f6ac9d6263c5/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/task_store.py#L40-L46)). If there is no size check, this states a hard limit that does not exist and points users at a worker-side backend they may not need. Could you point to the enforcement, or soften this to a recommended size? ########## airflow-core/docs/core-concepts/task-store.rst: ########## @@ -0,0 +1,278 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:task-store: + +.. spelling:word-list:: + + intra + Intra + checkpointing + +Task Store +========== + +.. versionadded:: 3.3 + +Task store is a persistent key/value store scoped to a single task instance (``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker crashes and task retries within the same Dag run, making it suitable for storing external job IDs, intra-task checkpoints, and progress metadata. + +Data persisted via task store is accessed through the task context via ``context["task_store"]`` and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``. + + +Accessing task store +-------------------- + +Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, task store is available through the ``context`` dictionary via the ``task_store`` key. From there, it can be used to retrieve, set, delete, or clear data for a specific key-value pair. In this example, the ``job_id`` is retrieved from task store, then updated, before being deleted. All data for that task is then removed using the ``clear`` method. + +.. code-block:: python + + from airflow.sdk import task + import random + + + @task + def my_task(**context): Review Comment: The examples reach the store via `context["task_store"]` inside `def my_task(**context):`, but the text never says the task has to accept `**context` (or call `airflow.sdk.get_current_context()`). A reader using the common TaskFlow form `def my_task():` will hit a `NameError`. One sentence by this first example, and showing the `get_current_context()` form once, would close the gap. ########## airflow-core/docs/core-concepts/task-store.rst: ########## @@ -0,0 +1,278 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:task-store: + +.. spelling:word-list:: + + intra + Intra + checkpointing + +Task Store +========== + +.. versionadded:: 3.3 + +Task store is a persistent key/value store scoped to a single task instance (``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker crashes and task retries within the same Dag run, making it suitable for storing external job IDs, intra-task checkpoints, and progress metadata. + +Data persisted via task store is accessed through the task context via ``context["task_store"]`` and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``. + + +Accessing task store +-------------------- + +Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, task store is available through the ``context`` dictionary via the ``task_store`` key. From there, it can be used to retrieve, set, delete, or clear data for a specific key-value pair. In this example, the ``job_id`` is retrieved from task store, then updated, before being deleted. All data for that task is then removed using the ``clear`` method. + +.. code-block:: python + + from airflow.sdk import task + import random + + + @task + def my_task(**context): + # Retrieve task_store from context + task_store = context["task_store"] + my_value = task_store.get("my_key", default="my_default_key") + + # Set the new value + new_value = f"It is {random.randint(1, 12 + 1)} o'clock" + task_store.set("my_key", new_value) + + # Delete the value + task_store.delete("my_key") + + # Clear all store entries for the task + task_store.clear() + +Reference +--------- + +``get(key, default)`` +~~~~~~~~~~~~~~~~~~~~~ + +Returns the stored JSON value, or the ``default`` value if the key does not exist. + +.. code-block:: python + + value = task_store.get( + "job_id", default="123456789" + ) # returns the value associated with `job_id` or the default value + +``set(key, value, *, retention=None)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Writes or overwrites a value for the specified key. Note, ``value`` can be any JSON-compatible type, except for ``None``. This includes: + +* ``str`` +* ``int`` +* ``float`` +* ``bool`` +* ``list`` +* ``dict`` + +The optional ``retention`` argument controls when the key expires: + +* ``timedelta(...)``: expire after the given duration from the time of the write (e.g. ``timedelta(hours=6)``). The expiry timestamp is computed on the worker before the value is sent to the API server. +* ``NEVER_EXPIRE``: the key never expires and is skipped during garbage collection, regardless of the global ``[state_store] default_retention_days`` setting. +* ``None`` (default): fall back to the global ``[state_store] default_retention_days`` config. + +.. important:: + + ``retention`` accepts only a :class:`~datetime.timedelta`, not a plain integer number of days. Passing an integer raises a ``TypeError``. + + .. code-block:: python + + # correct + task_store.set("key", "val", retention=timedelta(days=7)) + + # wrong — raises TypeError + task_store.set("key", "val", retention=7) + +``NEVER_EXPIRE`` sentinel +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Import ``NEVER_EXPIRE`` from ``airflow.sdk.execution_time.context``: + +.. code-block:: python + + from airflow.sdk import NEVER_EXPIRE + + task_store.set("job_id", job_id, retention=NEVER_EXPIRE) + +``delete(key)`` +~~~~~~~~~~~~~~~ + +Deletes a single key. No-op if the key does not exist. + +.. code-block:: python + + task_store.delete("job_id") + +``clear(all_map_indices=False)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Deletes *all* task store keys for this task instance. + +For :doc:`mapped tasks </authoring-and-scheduling/dynamic-task-mapping>`, the default clears only the current map index. Pass ``all_map_indices=True`` to wipe the store across **every** mapped instance of the task (fleet-wide reset). + +.. code-block:: python + + # clear only this map index + task_store.clear() + + # clear all map indices (fleet-wide) + task_store.clear(all_map_indices=True) + + +Some Example Use Cases +---------------------- + +External job resumption +~~~~~~~~~~~~~~~~~~~~~~~ + +A common pattern for long-running external jobs: check whether a job ID is already stored before submitting, and use ``NEVER_EXPIRE`` so the key outlives +the default retention window. + +.. code-block:: python + + from datetime import timedelta + + from airflow.sdk import DAG, task + from airflow.sdk import NEVER_EXPIRE + + with DAG("spark_job_dag", schedule=None): + + @task + def run_spark_job(**context): + task_store = context["task_store"] + + # Check for an already-submitted job from a previous attempt. + job_id = task_store.get("job_id") + if job_id is None: + job_id = spark_client.submit_job(...) + # Store with NEVER_EXPIRE so the key is not garbage-collected before the job finishes + task_store.set("job_id", job_id, retention=NEVER_EXPIRE) + + # Reattach to the job and wait for completion. + result = spark_client.wait_for_completion(job_id) + return result + +On a retry, the task finds the stored ``job_id`` and reattaches instead of submitting a duplicate job. Another example of this sort of logic can be found in `example_task_store.py <https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/example_dags/example_task_store.py>`_. + +For ``BaseOperator`` subclasses, the :class:`~airflow.sdk.bases.resumablemixin.ResumableJobMixin` encapsulates this pattern. It persists the external job ID to task store after submission and, on retry, reconnects to an active job or resubmits if the prior job reached a terminal failure state. + +Intra-task checkpointing +~~~~~~~~~~~~~~~~~~~~~~~~ + +For tasks that process paginated or batched data, store the last-completed offset so a retry can resume mid-stream rather than restarting from the beginning. + +.. code-block:: python + + from airflow.sdk import DAG, task + + with DAG("paginated_ingest", schedule="@daily"): + + @task + def ingest_pages(**context): + # Retrieve the task_store + task_store = context["task_store"] + raw = task_store.get("last_page") + + start_page = raw + 1 if raw is not None else 1 + + for page in range(start_page, total_pages + 1): + fetch_and_load(page) + task_store.set("last_page", page) # Update the task_store for reuse later + + +On a retry, the task reads ``last_page`` and skips pages that were already processed. + +Progress metadata +~~~~~~~~~~~~~~~~~ + +Task store can expose in-progress metrics for observability — row counts, status strings, or lightweight JSON payloads — without requiring XCom or an external system. + +.. code-block:: python + + + from airflow.sdk import DAG, task + + with DAG("row_ingest", schedule="@hourly"): + + @task + def ingest_rows(**context): + task_store = context["task_store"] + total = 0 + + for batch in get_batches(): + load(batch) + total += len(batch) + task_store.set( + "progress", + {"rows_loaded": total, "status": "running"}, + ) + + task_store.set( + "progress", + {"rows_loaded": total, "status": "done"}, + ) + +The ``progress`` key is visible through the REST API and the Airflow UI while the task is running. Review Comment: This presents REST/UI visibility as a feature with no caveat. Unlike Variables and Connections, store values are persisted as a plain `Text` column with no Fernet encryption, and they are readable through the REST API and UI by anyone with access to the task or asset. Worth one explicit line here: do not put secrets or credentials in task or asset store. ########## airflow-core/docs/administration-and-deployment/task-and-asset-store.rst: ########## @@ -0,0 +1,210 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _task-and-asset-store: + +Task and Asset Store Configuration +==================================== + +.. versionadded:: 3.3 + +The task and asset store is the persistence layer for :doc:`task store </core-concepts/task-store>` and :doc:`asset store </core-concepts/asset-store>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. + +Configuration reference +----------------------- + +All options live under the ``[state_store]`` section of ``airflow.cfg``. + +.. note:: + + The config section is ``[state_store]``, **not** ``[task_store]``. Review Comment: A reader meets "Task and Asset Store" throughout the concept pages, then lands on `[state_store]`, the `state-store` CLI, and `BaseStoreBackend`. This note only warns "not `[task_store]`"; the harder gap is that the whole config, CLI, and class namespace is `state_store`/`state-store`, which never appears in the concept docs. One sentence in the overview, something like "configuration, the CLI, and backend classes use the `state_store` name for this feature", would connect the two. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
