SameerMesiah97 opened a new issue, #60495:
URL: https://github.com/apache/airflow/issues/60495

   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   `apache-airflow-providers-cncf-kubernete==10.12.0 `
   
   ### Apache Airflow version
   
   main
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   When using `def watch_pod_events` from  `AsyncKubernetesHook`, the event 
watch terminates silently after `timeout_seconds`, even if the pod is still 
running and continuing to emit events.
   
   The Kubernetes Watch API closes watch streams after the `timeout_seconds` 
has elapsed without signaling an error. The current implementation treats this 
normal stream termination as completion and exits the generator instead of 
reconnecting. As a result, events emitted after the first watch timeout are 
dropped, and the Airflow task completes successfully without consuming all pod 
events.
   
   This behavior is silent and surprising: no error is raised, no warning is 
logged, and users receive incomplete event logs despite the pod still being 
alive.
   
   ### What you think should happen instead
   
   `def watch_pod_events` should treat watch stream termination as a reconnect 
signal, not as task completion.
   
   The method should:
   
   - Reconnect when the watch stream ends normally
   - Resume watching from the last observed resourceVersion
   - Continue yielding events for as long as the pod exists
   - Terminate only when:
     - The pod reaches a terminal phase (Succeeded / Failed)
     - The pod is deleted
     - The task is cancelled
     - An irrecoverable Kubernetes API error occurs (e.g. authorization failure)
   
   This matches Kubernetes watch semantics and user expectations for a 
long-running event stream.
   
   ### How to reproduce
   
   1. Create a pod (for this reproduction, named `event-spammer` in the 
`default` namespace) that continuously emits events for a duration longer than 
the watch timeout (for example, a pod that runs for several minutes).
   
   2. Run the following DAG, ensuring that the pod name and namespace in the 
DAG match the pod you created.
   ```
   from datetime import datetime
   import asyncio
   
   from airflow.decorators import dag, task
   from airflow.providers.cncf.kubernetes.hooks.kubernetes import 
AsyncKubernetesHook
   
   
   @task
   def watch_pod_events():
       async def _watch():
           hook = AsyncKubernetesHook(in_cluster=True)
   
           async for event in hook.watch_pod_events(
               name="event-spammer",
               namespace="default",
               timeout_seconds=30,  # triggers premature termination
           ):
               print(f"[EVENT] {event.message}")
   
       asyncio.run(_watch())
   
   
   @dag(
       start_date=datetime(2025, 1, 1),
       schedule=None,
       catchup=False,
   )
   def k8s_watch_timeout_repro():
       watch_pod_events()
   
   
   dag = k8s_watch_timeout_repro()
   ```
   3. Trigger the DAG and observe the task logs.
   
   4. The task will stop logging events after approximately `timeout_seconds`, 
even though the pod is still running and emitting events. No error or warning 
is produced that indicates the possibility of events being missed or the watch 
being terminated prematurely. 
   
   ### Anything else
   
   **This is unlikely to be a documentation-only issue.** 
   
   The current behavior violates the principle of least surprise. A function 
named `watch_pod_events`, with documentation (please refer to 'async 
watch_pod_events' on this 
[page](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_api/airflow/providers/cncf/kubernetes/hooks/kubernetes/index.html))
 stating that it “watches pod events,” reasonably implies a continuous event 
stream for the lifetime of the pod, not a best-effort stream that silently 
stops after a fixed timeout.
   
   While it is possible that some existing deployments have come to rely on the 
current behavior, this is unlikely to be intentional. In practice, users who 
require reliable event streaming already need to implement their own 
workarounds (e.g. manual polling loops, external controllers, or repeated watch 
restarts) to compensate for this limitation.
   
   The proposed change does alter behavior, but it does not break existing 
deployments:
   
   - Consumers that stop iteration early will continue to work unchanged
   - No new exceptions are introduced
   - The watch still terminates on pod completion, deletion, or cancellation
   - The API surface remains identical
   
   Instead, the change aligns runtime behavior with what users can reasonably 
expect based on the function name, docstring, and public API contract. It makes 
the default behavior safer, more intuitive, and more consistent with Kubernetes 
watch semantics, without removing any existing escape hatches.
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to