This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2cc74161a92 Make `example_task_state_store` use minimal context 
(#68556)
2cc74161a92 is described below

commit 2cc74161a92268164bd9c9459051263852a8e6e1
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Jun 15 13:15:21 2026 +0530

    Make `example_task_state_store` use minimal context (#68556)
---
 .../src/airflow/example_dags/example_task_state_store.py      | 11 ++++-------
 1 file changed, 4 insertions(+), 7 deletions(-)

diff --git a/airflow-core/src/airflow/example_dags/example_task_state_store.py 
b/airflow-core/src/airflow/example_dags/example_task_state_store.py
index c19bd611de9..3137374aa9d 100644
--- a/airflow-core/src/airflow/example_dags/example_task_state_store.py
+++ b/airflow-core/src/airflow/example_dags/example_task_state_store.py
@@ -58,19 +58,16 @@ with DAG(
 ):
 
     @task(retries=2, retry_delay=timedelta(seconds=5))
-    def run_job(**context):
-        task_state_store = context["task_state_store"]
-        try_number = context["ti"].try_number
-
+    def run_job(task_state_store=None, ti=None):
         job_id = task_state_store.get("job_id")
         if job_id:
-            print(f"Try {try_number}: reattaching to existing job: {job_id}")
+            print(f"Try {ti.try_number}: reattaching to existing job: 
{job_id}")
         else:
             job_id = _submit_job()
             # Store with NEVER_EXPIRE so the job ID survives across all 
retries.
             task_state_store.set("job_id", job_id, retention=NEVER_EXPIRE)
             task_state_store.set("submitted_at", 
datetime.now(tz=timezone.utc).isoformat())
-            print(f"Try {try_number}: submitted job: {job_id}")
+            print(f"Try {ti.try_number}: submitted job: {job_id}")
 
             # Simulate a crash after submission on the first attempt.
             # The retry will reattach to the same job instead of submitting a 
duplicate.
@@ -83,7 +80,7 @@ with DAG(
         task_state_store.set("status", "complete")
         task_state_store.set("result", result)
 
-        print(f"Try {try_number}: job complete — {result['rows_written']} rows 
written")
+        print(f"Try {ti.try_number}: job complete — {result['rows_written']} 
rows written")
         return result["rows_written"]
 
     run_job()

Reply via email to