This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2cc74161a92 Make `example_task_state_store` use minimal context
(#68556)
2cc74161a92 is described below
commit 2cc74161a92268164bd9c9459051263852a8e6e1
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Jun 15 13:15:21 2026 +0530
Make `example_task_state_store` use minimal context (#68556)
---
.../src/airflow/example_dags/example_task_state_store.py | 11 ++++-------
1 file changed, 4 insertions(+), 7 deletions(-)
diff --git a/airflow-core/src/airflow/example_dags/example_task_state_store.py
b/airflow-core/src/airflow/example_dags/example_task_state_store.py
index c19bd611de9..3137374aa9d 100644
--- a/airflow-core/src/airflow/example_dags/example_task_state_store.py
+++ b/airflow-core/src/airflow/example_dags/example_task_state_store.py
@@ -58,19 +58,16 @@ with DAG(
):
@task(retries=2, retry_delay=timedelta(seconds=5))
- def run_job(**context):
- task_state_store = context["task_state_store"]
- try_number = context["ti"].try_number
-
+ def run_job(task_state_store=None, ti=None):
job_id = task_state_store.get("job_id")
if job_id:
- print(f"Try {try_number}: reattaching to existing job: {job_id}")
+ print(f"Try {ti.try_number}: reattaching to existing job:
{job_id}")
else:
job_id = _submit_job()
# Store with NEVER_EXPIRE so the job ID survives across all
retries.
task_state_store.set("job_id", job_id, retention=NEVER_EXPIRE)
task_state_store.set("submitted_at",
datetime.now(tz=timezone.utc).isoformat())
- print(f"Try {try_number}: submitted job: {job_id}")
+ print(f"Try {ti.try_number}: submitted job: {job_id}")
# Simulate a crash after submission on the first attempt.
# The retry will reattach to the same job instead of submitting a
duplicate.
@@ -83,7 +80,7 @@ with DAG(
task_state_store.set("status", "complete")
task_state_store.set("result", result)
- print(f"Try {try_number}: job complete — {result['rows_written']} rows
written")
+ print(f"Try {ti.try_number}: job complete — {result['rows_written']}
rows written")
return result["rows_written"]
run_job()