TheNeuralBit commented on a change in pull request #13333:
URL: https://github.com/apache/beam/pull/13333#discussion_r524496438



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -223,8 +235,12 @@ def expand(self, pcolls):
 
         # Actually evaluate the expressions.
         def evaluate(partition, stage=self.stage, **side_inputs):
+          def lookup(expr):
+            return expr.proxy(
+            ).iloc[:0] if partition[expr._id] is None else partition[expr._id]

Review comment:
       nit: maybe add a comment like this to clarify the intention
   
   ```suggestion
               # Use proxy if there's no data in this partition
               return expr.proxy(
               ).iloc[:0] if partition[expr._id] is None else 
partition[expr._id]
   ```

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -410,6 +426,35 @@ def _total_memory_usage(frame):
     float('inf')
 
 
+class _PreBatch(beam.DoFn):
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):
+    self._target_size = target_size
+
+  def start_bundle(self):
+    self._parts = collections.defaultdict(list)
+    self._running_size = 0
+
+  def process(
+      self,
+      part,
+      window=beam.DoFn.WindowParam,
+      timestamp=beam.DoFn.TimestampParam):
+    part_size = _total_memory_usage(part)
+    if part_size >= self._target_size:
+      yield part
+    else:
+      self._running_size += part_size
+      self._parts[window, timestamp].append(part)
+      if self._running_size >= self._target_size:
+        yield from self.finish_bundle()
+
+  def finish_bundle(self):
+    for (window, timestamp), parts in self._parts.items():
+      yield windowed_value.WindowedValue(
+          pd.concat(parts), timestamp, (window, ))
+    self.start_bundle()

Review comment:
       This is frustratingly similar to _ReBatch... but I don't see a 
reasonable way to combine them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to