kaxil commented on code in PR #67299: URL: https://github.com/apache/airflow/pull/67299#discussion_r3366363923
########## airflow-core/docs/core-concepts/task-store.rst: ########## @@ -0,0 +1,278 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:task-store: + +.. spelling:word-list:: + + intra + Intra + checkpointing + +Task Store +========== + +.. versionadded:: 3.3 + +Task store is a persistent key/value store scoped to a single task instance (``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker crashes and task retries within the same Dag run, making it suitable for storing external job IDs, intra-task checkpoints, and progress metadata. + +Data persisted via task store is accessed through the task context via ``context["task_store"]`` and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``. + + +Accessing task store +-------------------- + +Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, task store is available through the ``context`` dictionary via the ``task_store`` key. From there, it can be used to retrieve, set, delete, or clear data for a specific key-value pair. In this example, the ``job_id`` is retrieved from task store, then updated, before being deleted. All data for that task is then removed using the ``clear`` method. + +.. code-block:: python + + from airflow.sdk import task + import random + + + @task + def my_task(**context): + # Retrieve task_store from context + task_store = context["task_store"] + my_value = task_store.get("my_key", default="my_default_key") + + # Set the new value + new_value = f"It is {random.randint(1, 12 + 1)} o'clock" + task_store.set("my_key", new_value) + + # Delete the value + task_store.delete("my_key") + + # Clear all store entries for the task + task_store.clear() + +Reference +--------- + +``get(key, default)`` +~~~~~~~~~~~~~~~~~~~~~ + +Returns the stored JSON value, or the ``default`` value if the key does not exist. + +.. code-block:: python + + value = task_store.get( + "job_id", default="123456789" + ) # returns the value associated with `job_id` or the default value + +``set(key, value, *, retention=None)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Writes or overwrites a value for the specified key. Note, ``value`` can be any JSON-compatible type, except for ``None``. This includes: + +* ``str`` +* ``int`` +* ``float`` +* ``bool`` +* ``list`` +* ``dict`` + +The optional ``retention`` argument controls when the key expires: + +* ``timedelta(...)``: expire after the given duration from the time of the write (e.g. ``timedelta(hours=6)``). The expiry timestamp is computed on the worker before the value is sent to the API server. +* ``NEVER_EXPIRE``: the key never expires and is skipped during garbage collection, regardless of the global ``[state_store] default_retention_days`` setting. +* ``None`` (default): fall back to the global ``[state_store] default_retention_days`` config. + +.. important:: + + ``retention`` accepts only a :class:`~datetime.timedelta`, not a plain integer number of days. Passing an integer raises a ``TypeError``. + + .. code-block:: python + + # correct + task_store.set("key", "val", retention=timedelta(days=7)) + + # wrong — raises TypeError + task_store.set("key", "val", retention=7) + +``NEVER_EXPIRE`` sentinel +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Import ``NEVER_EXPIRE`` from ``airflow.sdk.execution_time.context``: + +.. code-block:: python + + from airflow.sdk import NEVER_EXPIRE + + task_store.set("job_id", job_id, retention=NEVER_EXPIRE) + +``delete(key)`` +~~~~~~~~~~~~~~~ + +Deletes a single key. No-op if the key does not exist. + +.. code-block:: python + + task_store.delete("job_id") + +``clear(all_map_indices=False)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Deletes *all* task store keys for this task instance. + +For :doc:`mapped tasks </authoring-and-scheduling/dynamic-task-mapping>`, the default clears only the current map index. Pass ``all_map_indices=True`` to wipe the store across **every** mapped instance of the task (fleet-wide reset). + +.. code-block:: python + + # clear only this map index + task_store.clear() + + # clear all map indices (fleet-wide) + task_store.clear(all_map_indices=True) + + +Some Example Use Cases +---------------------- + +External job resumption +~~~~~~~~~~~~~~~~~~~~~~~ + +A common pattern for long-running external jobs: check whether a job ID is already stored before submitting, and use ``NEVER_EXPIRE`` so the key outlives +the default retention window. + +.. code-block:: python + + from datetime import timedelta + + from airflow.sdk import DAG, task + from airflow.sdk import NEVER_EXPIRE + + with DAG("spark_job_dag", schedule=None): + + @task + def run_spark_job(**context): + task_store = context["task_store"] + + # Check for an already-submitted job from a previous attempt. + job_id = task_store.get("job_id") + if job_id is None: + job_id = spark_client.submit_job(...) + # Store with NEVER_EXPIRE so the key is not garbage-collected before the job finishes + task_store.set("job_id", job_id, retention=NEVER_EXPIRE) + + # Reattach to the job and wait for completion. + result = spark_client.wait_for_completion(job_id) + return result + +On a retry, the task finds the stored ``job_id`` and reattaches instead of submitting a duplicate job. Another example of this sort of logic can be found in `example_task_store.py <https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/example_dags/example_task_store.py>`_. + +For ``BaseOperator`` subclasses, the :class:`~airflow.sdk.bases.resumablemixin.ResumableJobMixin` encapsulates this pattern. It persists the external job ID to task store after submission and, on retry, reconnects to an active job or resubmits if the prior job reached a terminal failure state. + +Intra-task checkpointing +~~~~~~~~~~~~~~~~~~~~~~~~ + +For tasks that process paginated or batched data, store the last-completed offset so a retry can resume mid-stream rather than restarting from the beginning. + +.. code-block:: python + + from airflow.sdk import DAG, task + + with DAG("paginated_ingest", schedule="@daily"): + + @task + def ingest_pages(**context): + # Retrieve the task_store + task_store = context["task_store"] + raw = task_store.get("last_page") + + start_page = raw + 1 if raw is not None else 1 + + for page in range(start_page, total_pages + 1): + fetch_and_load(page) + task_store.set("last_page", page) # Update the task_store for reuse later + + +On a retry, the task reads ``last_page`` and skips pages that were already processed. + +Progress metadata +~~~~~~~~~~~~~~~~~ + +Task store can expose in-progress metrics for observability — row counts, status strings, or lightweight JSON payloads — without requiring XCom or an external system. + +.. code-block:: python + + + from airflow.sdk import DAG, task + + with DAG("row_ingest", schedule="@hourly"): + + @task + def ingest_rows(**context): + task_store = context["task_store"] + total = 0 + + for batch in get_batches(): + load(batch) + total += len(batch) + task_store.set( + "progress", + {"rows_loaded": total, "status": "running"}, + ) + + task_store.set( + "progress", + {"rows_loaded": total, "status": "done"}, + ) + +The ``progress`` key is visible through the REST API and the Airflow UI while the task is running. Review Comment: This presents REST/UI visibility as a feature with no caveat. Unlike Variables and Connections, store values are persisted as a plain `Text` column with no Fernet encryption, and they are readable through the REST API and UI by anyone with access to the task or asset. Worth one explicit line here: do not put secrets or credentials in task or asset store. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
