damccorm commented on code in PR #29175:
URL: https://github.com/apache/beam/pull/29175#discussion_r1378066824


##########
sdks/python/apache_beam/transforms/util.py:
##########
@@ -728,6 +836,86 @@ def expand(self, pcoll):
               self._batch_size_estimator, self._element_size_fn))
 
 
[email protected]_input_types(T)
[email protected]_output_types(List[T])
+class StatefulBatchElements(PTransform):
+  """A Transform that batches elements for amortized processing.
+
+  This transform is designed to precede operations whose processing cost
+  is of the form
+
+      time = fixed_cost + num_elements * per_element_cost
+
+  where the per element cost is (often significantly) smaller than the fixed
+  cost and could be amortized over multiple elements.  It consumes a 
PCollection
+  of element type T and produces a PCollection of element type List[T].
+
+  This transform attempts to find the best batch size between the minimim
+  and maximum parameters by profiling the time taken by (fused) downstream
+  operations. For a fixed batch size, set the min and max to be equal.
+
+  Elements are batched per-window and batches emitted in the window
+  corresponding to its contents. Each batch is emitted with a timestamp at
+  the end of their window.
+
+  Args:
+    min_batch_size: (optional) the smallest size of a batch
+    max_batch_size: (optional) the largest size of a batch
+    target_batch_overhead: (optional) a target for fixed_cost / time,
+        as used in the formula above
+    target_batch_duration_secs: (optional) a target for total time per bundle,
+        in seconds, excluding fixed cost
+    target_batch_duration_secs_including_fixed_cost: (optional) a target for
+        total time per bundle, in seconds, including fixed cost
+    element_size_fn: (optional) A mapping of an element to its contribution to
+        batch size, defaulting to every element having size 1.  When provided,
+        attempts to provide batches of optimal total size which may consist of
+        a varying number of elements.
+    variance: (optional) the permitted (relative) amount of deviation from the
+        (estimated) ideal batch size used to produce a wider base for
+        linear interpolation
+    clock: (optional) an alternative to time.time for measuring the cost of
+        donwstream operations (mostly for testing)
+    record_metrics: (optional) whether or not to record beam metrics on
+        distributions of the batch size. Defaults to True.
+  """
+  def __init__(
+      self,
+      min_batch_size=1,
+      max_batch_size=10000,
+      target_batch_overhead=.05,
+      target_batch_duration_secs=10,
+      target_batch_duration_secs_including_fixed_cost=None,
+      max_batch_duration_secs=100,

Review Comment:
   So to be clear, `max_batch_duration_secs` is the only additional parameter 
here, right? With that in mind, my vote would probably still be to make this 
part of `BatchElements`. I think we could drop in identical code, with the only 
difference being what we do in expand. One option would be to only use this 
implementation if `max_batch_duration_secs` is set (and make it default to None 
instead of 100). That would also give us the option to flip it to the default 
if we find that it performs as well/better later on (especially if we can 
detect that we're in a streaming pipeline).
   
   That allows users to avoid needing to pick between 2 seemingly identical 
batching algorithms.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to