gemini-code-assist[bot] commented on code in PR #38262:
URL: https://github.com/apache/beam/pull/38262#discussion_r3157175725
##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -126,8 +138,28 @@ def __init__(
def initialize_pool(parallelism):
return lambda: ThreadPoolExecutor(max_workers=parallelism)
+ @staticmethod
+ def _run_event_loop():
+ """Sets up and runs the asyncio event loop in a background thread."""
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ AsyncWrapper._event_loop = loop
+ AsyncWrapper._loop_started.set()
+ loop.run_forever()
+ loop.close()
+
@staticmethod
def reset_state():
+ if AsyncWrapper._event_loop:
+ AsyncWrapper._event_loop.call_soon_threadsafe(
+ AsyncWrapper._event_loop.stop)
+ if AsyncWrapper._event_loop_thread:
+ AsyncWrapper._event_loop_thread.join()
+
+ AsyncWrapper._event_loop = None
+ AsyncWrapper._event_loop_thread = None
+ AsyncWrapper._loop_started.clear()
+
for pool in AsyncWrapper._pool.values():
pool.acquire(AsyncWrapper.initialize_pool(1)).shutdown(
wait=True, cancel_futures=True)
Review Comment:

The `reset_state` method modifies and accesses class-level variables
(`_event_loop`, `_event_loop_thread`, `_loop_started`) that are also used in
the `setup` method. To prevent race conditions, especially in multi-threaded
environments or during test execution, this method should acquire
`AsyncWrapper._lock` before modifying these shared resources.
```suggestion
@staticmethod
def reset_state():
with AsyncWrapper._lock:
if AsyncWrapper._event_loop:
AsyncWrapper._event_loop.call_soon_threadsafe(
AsyncWrapper._event_loop.stop)
if AsyncWrapper._event_loop_thread:
AsyncWrapper._event_loop_thread.join()
AsyncWrapper._event_loop = None
AsyncWrapper._event_loop_thread = None
AsyncWrapper._loop_started.clear()
for pool in AsyncWrapper._pool.values():
pool.acquire(AsyncWrapper.initialize_pool(1)).shutdown(
wait=True, cancel_futures=True)
```
##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -190,6 +228,52 @@ def sync_fn_process(self, element, *args, **kwargs):
return to_return
+ async def async_fn_process(self, element, *args, **kwargs):
+ """Makes the call to the wrapped dofn's start_bundle, process
+ and finish_bundle methods for asynchronous DoFns.
+
+ Args:
+ element: The element to process.
+ *args: Any additional arguments to pass to the wrapped dofn's process
+ method.
+ **kwargs: Any additional keyword arguments to pass to the wrapped dofn's
+ process method.
+
+ Returns:
+ A list of elements produced by the input element.
+ """
+ self._sync_fn.start_bundle()
+ process_result = self._sync_fn.process(element, *args, **kwargs)
+ bundle_result = self._sync_fn.finish_bundle()
+
+ if not process_result:
+ process_result = []
+ elif isinstance(process_result, AsyncIterable):
+ temp = []
+ async for item in process_result:
+ temp.append(item)
+ process_result = temp
+ elif not isinstance(process_result, (GeneratorType, Iterable)):
+ process_result = [process_result]
+
+ if not bundle_result:
+ bundle_result = []
+ elif isinstance(bundle_result, AsyncIterable):
+ temp = []
+ async for item in bundle_result:
+ temp.append(item)
+ bundle_result = temp
+ elif not isinstance(bundle_result, (GeneratorType, Iterable)):
+ bundle_result = [bundle_result]
+
+ to_return = []
+ for x in process_result:
+ to_return.append(x)
+ for x in bundle_result:
+ to_return.append(x)
+
+ return to_return
Review Comment:

The current implementation of `async_fn_process` has two significant issues:
1. **Missing `await` for Coroutines**: If the wrapped DoFn methods
(`process`, `finish_bundle`) are defined as `async def`, they return coroutine
objects. These are currently not being awaited, meaning the coroutines
themselves (rather than their results) will be returned, which is incorrect.
2. **Blocking the Event Loop**: If the wrapped DoFn is synchronous and
performs blocking I/O, calling it directly on the event loop thread will block
the entire loop, serializing all processing and defeating the purpose of using
`asyncio` for parallelism.
The suggested refactoring handles awaitables and async iterables, and
simplifies the result collection logic.
```python
async def async_fn_process(self, element, *args, **kwargs):
"""Makes the call to the wrapped dofn's start_bundle, process
and finish_bundle methods for asynchronous DoFns.
Args:
element: The element to process.
*args: Any additional arguments to pass to the wrapped dofn's process
method.
**kwargs: Any additional keyword arguments to pass to the wrapped
dofn's
process method.
Returns:
A list of elements produced by the input element.
"""
async def _collect(result):
if result is None:
return []
if asyncio.iscoroutine(result):
result = await result
if isinstance(result, AsyncIterable):
return [item async for item in result]
if isinstance(result, (GeneratorType, Iterable)) and not isinstance(
result, (str, bytes)):
return list(result)
return [result]
start_res = self._sync_fn.start_bundle()
if asyncio.iscoroutine(start_res):
await start_res
process_result = await _collect(
self._sync_fn.process(element, *args, **kwargs))
bundle_result = await _collect(self._sync_fn.finish_bundle())
return process_result + bundle_result
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]