gemini-code-assist[bot] commented on code in PR #38262:
URL: https://github.com/apache/beam/pull/38262#discussion_r3157175725


##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -126,8 +138,28 @@ def __init__(
   def initialize_pool(parallelism):
     return lambda: ThreadPoolExecutor(max_workers=parallelism)
 
+  @staticmethod
+  def _run_event_loop():
+    """Sets up and runs the asyncio event loop in a background thread."""
+    loop = asyncio.new_event_loop()
+    asyncio.set_event_loop(loop)
+    AsyncWrapper._event_loop = loop
+    AsyncWrapper._loop_started.set()
+    loop.run_forever()
+    loop.close()
+
   @staticmethod
   def reset_state():
+    if AsyncWrapper._event_loop:
+      AsyncWrapper._event_loop.call_soon_threadsafe(
+          AsyncWrapper._event_loop.stop)
+    if AsyncWrapper._event_loop_thread:
+      AsyncWrapper._event_loop_thread.join()
+
+    AsyncWrapper._event_loop = None
+    AsyncWrapper._event_loop_thread = None
+    AsyncWrapper._loop_started.clear()
+
     for pool in AsyncWrapper._pool.values():
       pool.acquire(AsyncWrapper.initialize_pool(1)).shutdown(
           wait=True, cancel_futures=True)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `reset_state` method modifies and accesses class-level variables 
(`_event_loop`, `_event_loop_thread`, `_loop_started`) that are also used in 
the `setup` method. To prevent race conditions, especially in multi-threaded 
environments or during test execution, this method should acquire 
`AsyncWrapper._lock` before modifying these shared resources.
   
   ```suggestion
     @staticmethod
     def reset_state():
       with AsyncWrapper._lock:
         if AsyncWrapper._event_loop:
           AsyncWrapper._event_loop.call_soon_threadsafe(
               AsyncWrapper._event_loop.stop)
         if AsyncWrapper._event_loop_thread:
           AsyncWrapper._event_loop_thread.join()
   
         AsyncWrapper._event_loop = None
         AsyncWrapper._event_loop_thread = None
         AsyncWrapper._loop_started.clear()
   
         for pool in AsyncWrapper._pool.values():
           pool.acquire(AsyncWrapper.initialize_pool(1)).shutdown(
               wait=True, cancel_futures=True)
   ```



##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -190,6 +228,52 @@ def sync_fn_process(self, element, *args, **kwargs):
 
     return to_return
 
+  async def async_fn_process(self, element, *args, **kwargs):
+    """Makes the call to the wrapped dofn's start_bundle, process
+    and finish_bundle methods for asynchronous DoFns.
+
+    Args:
+      element: The element to process.
+      *args: Any additional arguments to pass to the wrapped dofn's process
+        method.
+      **kwargs: Any additional keyword arguments to pass to the wrapped dofn's
+        process method.
+
+    Returns:
+      A list of elements produced by the input element.
+    """
+    self._sync_fn.start_bundle()
+    process_result = self._sync_fn.process(element, *args, **kwargs)
+    bundle_result = self._sync_fn.finish_bundle()
+
+    if not process_result:
+      process_result = []
+    elif isinstance(process_result, AsyncIterable):
+      temp = []
+      async for item in process_result:
+        temp.append(item)
+      process_result = temp
+    elif not isinstance(process_result, (GeneratorType, Iterable)):
+      process_result = [process_result]
+
+    if not bundle_result:
+      bundle_result = []
+    elif isinstance(bundle_result, AsyncIterable):
+      temp = []
+      async for item in bundle_result:
+        temp.append(item)
+      bundle_result = temp
+    elif not isinstance(bundle_result, (GeneratorType, Iterable)):
+      bundle_result = [bundle_result]
+
+    to_return = []
+    for x in process_result:
+      to_return.append(x)
+    for x in bundle_result:
+      to_return.append(x)
+
+    return to_return

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The current implementation of `async_fn_process` has two significant issues:
   
   1. **Missing `await` for Coroutines**: If the wrapped DoFn methods 
(`process`, `finish_bundle`) are defined as `async def`, they return coroutine 
objects. These are currently not being awaited, meaning the coroutines 
themselves (rather than their results) will be returned, which is incorrect.
   2. **Blocking the Event Loop**: If the wrapped DoFn is synchronous and 
performs blocking I/O, calling it directly on the event loop thread will block 
the entire loop, serializing all processing and defeating the purpose of using 
`asyncio` for parallelism.
   
   The suggested refactoring handles awaitables and async iterables, and 
simplifies the result collection logic.
   
   ```python
     async def async_fn_process(self, element, *args, **kwargs):
       """Makes the call to the wrapped dofn's start_bundle, process
       and finish_bundle methods for asynchronous DoFns.
   
       Args:
         element: The element to process.
         *args: Any additional arguments to pass to the wrapped dofn's process
           method.
         **kwargs: Any additional keyword arguments to pass to the wrapped 
dofn's
           process method.
   
       Returns:
         A list of elements produced by the input element.
       """
       async def _collect(result):
         if result is None:
           return []
         if asyncio.iscoroutine(result):
           result = await result
         if isinstance(result, AsyncIterable):
           return [item async for item in result]
         if isinstance(result, (GeneratorType, Iterable)) and not isinstance(
             result, (str, bytes)):
           return list(result)
         return [result]
   
       start_res = self._sync_fn.start_bundle()
       if asyncio.iscoroutine(start_res):
         await start_res
   
       process_result = await _collect(
           self._sync_fn.process(element, *args, **kwargs))
       bundle_result = await _collect(self._sync_fn.finish_bundle())
   
       return process_result + bundle_result
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to