tvalentyn commented on code in PR #35391:
URL: https://github.com/apache/beam/pull/35391#discussion_r2258273264
##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -230,7 +232,12 @@ def default_factory(id):
status_address,
self._bundle_processor_cache,
self._state_cache,
- enable_heap_dump) # type: Optional[FnApiWorkerStatusHandler]
+ enable_heap_dump,
+ element_processing_timeout_minutes=self.
+ element_processing_timeout_minutes
+ ) # type: Optional[FnApiWorkerStatusHandler]
+ except TimeoutError as e:
Review Comment:
+1, the TimeoutError won't be caught here - this `try` block finishes after
status handler is constructed.
+1 - the error will be thrown on the status handler thread, uncaught, and it
should simply terminate the status handler thread, but the main thread will
keep running, SDK process shouldn't terminate ( did you see a different
behavior when you tested this change? )
Exiting the from StatusHandler thread should work (if we called
`_shutdown_due_to_element_processing_timeout`) from the status handler. However
it would be better if we flushed the logs, as is attempted to do on the main
thread
https://github.com/apache/beam/blob/4114f7c314a8ae3dccc9e2b78f2474b6337a6e7c/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L214-L219
.
We could do this:
- make logs handler a process-level global variable
- add a helpers in sdk_worker_main to flush log handler (if defined) +
shut down the process. This would be callable from any thread.
- call these from worker_status thread when timeout is reached
Even better would be to catch the expression on the main thread but that
requires either message passing from child threads to main thread or
refactoring thread management using `concurrent.futures`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]