This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d6636f4227c AIP-72: Logging updated state in execution API server
(#45074)
d6636f4227c is described below
commit d6636f4227c496d48f9fc007525de2e26e18973a
Author: Amogh Desai <[email protected]>
AuthorDate: Thu Dec 19 20:09:10 2024 +0530
AIP-72: Logging updated state in execution API server (#45074)
---
airflow/api_fastapi/execution_api/routes/task_instances.py | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow/api_fastapi/execution_api/routes/task_instances.py
index 2184c2946b8..016f5222c79 100644
--- a/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ b/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -180,6 +180,8 @@ def ti_update_state(
Not all state transitions are valid, and transitioning to some states
requires extra information to be
passed along. (Check out the datamodels for details, the rendered docs
might not reflect this accurately)
"""
+ updated_state: str = ""
+
# We only use UUID above for validation purposes
ti_id_str = str(task_instance_id)
@@ -207,6 +209,7 @@ def ti_update_state(
if ti_patch_payload.state == State.FAILED:
# clear the next_method and next_kwargs
query = query.values(next_method=None, next_kwargs=None)
+ updated_state = State.FAILED
elif isinstance(ti_patch_payload, TIDeferredStatePayload):
# Calculate timeout if it was passed
timeout = None
@@ -231,6 +234,7 @@ def ti_update_state(
next_kwargs=ti_patch_payload.trigger_kwargs,
trigger_timeout=timeout,
)
+ updated_state = State.DEFERRED
elif isinstance(ti_patch_payload, TIRescheduleStatePayload):
task_instance = session.get(TI, ti_id_str)
actual_start_date = timezone.utcnow()
@@ -252,11 +256,12 @@ def ti_update_state(
query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)
# clear the next_method and next_kwargs so that none of the retries
pick them up
query = query.values(state=State.UP_FOR_RESCHEDULE, next_method=None,
next_kwargs=None)
+ updated_state = State.UP_FOR_RESCHEDULE
# TODO: Replace this with FastAPI's Custom Exception handling:
#
https://fastapi.tiangolo.com/tutorial/handling-errors/#install-custom-exception-handlers
try:
result = session.execute(query)
- log.info("TI %s state updated: %s row(s) affected", ti_id_str,
result.rowcount)
+ log.info("TI %s state updated to %s: %s row(s) affected", ti_id_str,
updated_state, result.rowcount)
except SQLAlchemyError as e:
log.error("Error updating Task Instance state: %s", e)
raise HTTPException(