Nataneljpwd commented on code in PR #61778:
URL: https://github.com/apache/airflow/pull/61778#discussion_r2799035048
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -958,6 +959,12 @@ def trigger_reentry(self, context: Context, event:
dict[str, Any]) -> Any:
follow = self.logging_interval is None
last_log_time = event.get("last_log_time")
+ if event["status"] == "timeout":
+ pod_phase = self.pod.status.phase if self.pod.status and
self.pod.status.phase else None
+ if pod_phase in {PodPhase.RUNNING, *PodPhase.terminal_states}:
+ self.log.info("Pod has transitioned from pending state
after timeout, deferring again")
+ self.invoke_defer_method(last_log_time=last_log_time,
context=context)
Review Comment:
Since we do not break the code execution flow, it means that callbacks will
still run, are we sure we want this behavior?
As if a deferred task failed due to timeout (infra error) rather than user
error, should we notify the user about it?
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -946,6 +946,7 @@ def trigger_reentry(self, context: Context, event:
dict[str, Any]) -> Any:
"""
self.pod = None
xcom_sidecar_output = None
+ skip_cleanup = False
Review Comment:
I think this flag can be avoided, and it is better to avoid it to reduce the
mental load when reading this code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]