robertwb commented on a change in pull request #13333:
URL: https://github.com/apache/beam/pull/13333#discussion_r524508497



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -410,6 +426,35 @@ def _total_memory_usage(frame):
     float('inf')
 
 
+class _PreBatch(beam.DoFn):
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):
+    self._target_size = target_size
+
+  def start_bundle(self):
+    self._parts = collections.defaultdict(list)
+    self._running_size = 0
+
+  def process(
+      self,
+      part,
+      window=beam.DoFn.WindowParam,
+      timestamp=beam.DoFn.TimestampParam):
+    part_size = _total_memory_usage(part)
+    if part_size >= self._target_size:
+      yield part
+    else:
+      self._running_size += part_size
+      self._parts[window, timestamp].append(part)
+      if self._running_size >= self._target_size:
+        yield from self.finish_bundle()
+
+  def finish_bundle(self):
+    for (window, timestamp), parts in self._parts.items():
+      yield windowed_value.WindowedValue(
+          pd.concat(parts), timestamp, (window, ))
+    self.start_bundle()

Review comment:
       Yeah, I was thinking exactly the same... 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to