damccorm commented on code in PR #38262:
URL: https://github.com/apache/beam/pull/38262#discussion_r3157051308


##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -190,6 +228,52 @@ def sync_fn_process(self, element, *args, **kwargs):
 
     return to_return
 
+  async def async_fn_process(self, element, *args, **kwargs):
+    """Makes the call to the wrapped dofn's start_bundle, process
+    and finish_bundle methods for asynchronous DoFns.
+
+    Args:
+      element: The element to process.
+      *args: Any additional arguments to pass to the wrapped dofn's process
+        method.
+      **kwargs: Any additional keyword arguments to pass to the wrapped dofn's
+        process method.
+
+    Returns:
+      A list of elements produced by the input element.
+    """
+    self._sync_fn.start_bundle()
+    process_result = self._sync_fn.process(element, *args, **kwargs)
+    bundle_result = self._sync_fn.finish_bundle()
+
+    if not process_result:

Review Comment:
   Optional nit: thoughts on wrapping this in a small local function so that we 
just need to call something like:
   
   ```
   process_result = gather_async_results(process_result)
   bundle_result = gather_async_results(bundle_result)
   ```



##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -190,6 +228,52 @@ def sync_fn_process(self, element, *args, **kwargs):
 
     return to_return
 
+  async def async_fn_process(self, element, *args, **kwargs):
+    """Makes the call to the wrapped dofn's start_bundle, process
+    and finish_bundle methods for asynchronous DoFns.
+
+    Args:
+      element: The element to process.
+      *args: Any additional arguments to pass to the wrapped dofn's process
+        method.
+      **kwargs: Any additional keyword arguments to pass to the wrapped dofn's
+        process method.
+
+    Returns:
+      A list of elements produced by the input element.
+    """
+    self._sync_fn.start_bundle()
+    process_result = self._sync_fn.process(element, *args, **kwargs)
+    bundle_result = self._sync_fn.finish_bundle()
+
+    if not process_result:
+      process_result = []
+    elif isinstance(process_result, AsyncIterable):
+      temp = []
+      async for item in process_result:
+        temp.append(item)
+      process_result = temp
+    elif not isinstance(process_result, (GeneratorType, Iterable)):
+      process_result = [process_result]
+
+    if not bundle_result:
+      bundle_result = []
+    elif isinstance(bundle_result, AsyncIterable):
+      temp = []
+      async for item in bundle_result:
+        temp.append(item)
+      bundle_result = temp
+    elif not isinstance(bundle_result, (GeneratorType, Iterable)):
+      bundle_result = [bundle_result]
+
+    to_return = []
+    for x in process_result:
+      to_return.append(x)
+    for x in bundle_result:
+      to_return.append(x)
+
+    return to_return

Review Comment:
   Nit, this can be simplified since both of these are lists:
   
   ```suggestion
       return process_result + bundle_result
   ```



##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -104,6 +113,8 @@ def __init__(
         schedule an item.  Used in testing to ensure timeouts are met.
       id_fn: A function that returns a hashable object from an element. This
         will be used to track items instead of the element's default hash.
+      use_asyncio: If true, use asyncio and coroutines to process items. If
+        false, use ThreadPoolExecutor.

Review Comment:
   Regardless, this doc should have info on why a user would choose one vs the 
other (or if its almost always one then we should just make this a kwarg)



##########
sdks/python/apache_beam/transforms/async_dofn_test.py:
##########
@@ -86,7 +86,9 @@ def set(self, time):
     self.time = time
 
 
-class AsyncTest(unittest.TestCase):
+class _AsyncTestBase:

Review Comment:
   
https://stackoverflow.com/questions/38729007/parametrize-class-tests-with-pytest
 is probably the more pythonic way to do this



##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -104,6 +113,8 @@ def __init__(
         schedule an item.  Used in testing to ensure timeouts are met.
       id_fn: A function that returns a hashable object from an element. This
         will be used to track items instead of the element's default hash.
+      use_asyncio: If true, use asyncio and coroutines to process items. If
+        false, use ThreadPoolExecutor.

Review Comment:
   In our tests, has asyncio generally performed better? Are there reasons to 
default it off?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to