This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d6636f4227c AIP-72: Logging updated state in execution API server 
(#45074)
d6636f4227c is described below

commit d6636f4227c496d48f9fc007525de2e26e18973a
Author: Amogh Desai <[email protected]>
AuthorDate: Thu Dec 19 20:09:10 2024 +0530

    AIP-72: Logging updated state in execution API server (#45074)
---
 airflow/api_fastapi/execution_api/routes/task_instances.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow/api_fastapi/execution_api/routes/task_instances.py
index 2184c2946b8..016f5222c79 100644
--- a/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ b/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -180,6 +180,8 @@ def ti_update_state(
     Not all state transitions are valid, and transitioning to some states 
requires extra information to be
     passed along. (Check out the datamodels for details, the rendered docs 
might not reflect this accurately)
     """
+    updated_state: str = ""
+
     # We only use UUID above for validation purposes
     ti_id_str = str(task_instance_id)
 
@@ -207,6 +209,7 @@ def ti_update_state(
         if ti_patch_payload.state == State.FAILED:
             # clear the next_method and next_kwargs
             query = query.values(next_method=None, next_kwargs=None)
+            updated_state = State.FAILED
     elif isinstance(ti_patch_payload, TIDeferredStatePayload):
         # Calculate timeout if it was passed
         timeout = None
@@ -231,6 +234,7 @@ def ti_update_state(
             next_kwargs=ti_patch_payload.trigger_kwargs,
             trigger_timeout=timeout,
         )
+        updated_state = State.DEFERRED
     elif isinstance(ti_patch_payload, TIRescheduleStatePayload):
         task_instance = session.get(TI, ti_id_str)
         actual_start_date = timezone.utcnow()
@@ -252,11 +256,12 @@ def ti_update_state(
         query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
         # clear the next_method and next_kwargs so that none of the retries 
pick them up
         query = query.values(state=State.UP_FOR_RESCHEDULE, next_method=None, 
next_kwargs=None)
+        updated_state = State.UP_FOR_RESCHEDULE
     # TODO: Replace this with FastAPI's Custom Exception handling:
     # 
https://fastapi.tiangolo.com/tutorial/handling-errors/#install-custom-exception-handlers
     try:
         result = session.execute(query)
-        log.info("TI %s state updated: %s row(s) affected", ti_id_str, 
result.rowcount)
+        log.info("TI %s state updated to %s: %s row(s) affected", ti_id_str, 
updated_state, result.rowcount)
     except SQLAlchemyError as e:
         log.error("Error updating Task Instance state: %s", e)
         raise HTTPException(

Reply via email to