cozos commented on code in PR #26526:
URL: https://github.com/apache/beam/pull/26526#discussion_r1185957495
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2321,6 +2348,43 @@ def _remote_teardown(cls):
cls._fn = None
+class _TimeoutDoFn(DoFn):
+ """Process method run in a separate thread allowing timeouts.
+ """
+ def __init__(self, fn, timeout=None):
+ self._fn = fn
+ self._timeout = timeout
+ self._pool = None
+
+ def __getattribute__(self, name):
+ if (name.startswith('__') or name in self.__dict__ or
+ name in type(self).__dict__):
+ return object.__getattribute__(self, name)
+ else:
+ return getattr(self._fn, name)
+
+ def process(self, *args, **kwargs):
+ if self._pool is None:
+ self._pool = concurrent.futures.ThreadPoolExecutor(10)
+ # Ensure we iterate over the entire output list in the given amount of
time.
+ try:
+ return self._pool.submit(
+ lambda: list(self._fn.process(*args, **kwargs))).result(
+ self._timeout)
+ except TimeoutError:
+ self._pool.shutdown(wait=False)
+ self._pool = None
+ raise
+
+ def teardown(self):
+ try:
+ self._fn.teardown()
+ finally:
+ if self._pool is not None:
+ self._pool.shutdown(wait=False)
Review Comment:
@robertwb I think comes with lots of problems. `shutdown(wait=False)` is non
blocking but actively running threads will NOT be cancelled. That means that
the threads in the pool will become zombies - they continue running, consuming
memory and CPU resources until its execution finishes. Users who use this will
think they have a memory leak.
From
https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown
```
If wait is False then this method will return immediately and the resources
associated with the executor will be freed when all pending futures are done
executing. Regardless of the value of wait, the entire Python program will not
exit until all pending futures are done executing.
If cancel_futures is True, this method will cancel all pending futures that
the executor has not started running. Any futures that are completed or running
won’t be cancelled, regardless of the value of cancel_futures.
```
Try putting an infinite loop in the TimeoutDoFn.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]