TheNeuralBit commented on a change in pull request #12971:
URL: https://github.com/apache/beam/pull/12971#discussion_r501261972



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -166,16 +173,40 @@ def expand(self, pcolls):
           partitioned_pcoll = next(pcolls.values()).pipeline | 
beam.Create([{}])
 
         elif self.stage.partitioning != partitionings.Nothing():
+          # Partitioning required for these operations.
+          # Compute the number of partitions to use based on estimated size.
+          if self.stage.partitioning == partitionings.Singleton():
+            # Always a single partition, don't waste time computing sizes.
+            num_partitions = 1
+          else:
+            # Estimate the sizes from the outputs of a *previous* stage such
+            # that using these estimates will not cause a fusion break.
+            input_sizes = [
+                estimate_size(input, same_stage_ok=False)
+                for input in tabular_inputs
+            ]
+            if None in input_sizes:
+              # We were unable to (cheaply) compute the size of one or more
+              # inputs.
+              num_partitions = DEFAULT_PARTITIONS
+            else:
+              num_partitions = beam.pvalue.AsSingleton(
+                  input_sizes
+                  | 'FlattenSizes' >> beam.Flatten()
+                  | 'SumSizes' >> beam.CombineGlobally(sum)

Review comment:
       This is making the assumption that this stage's outputs are equal to the 
sum of the sizes of the inputs right? Could you document that here?

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.

Review comment:
       Did you do some benchmarks that motivated this?

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):

Review comment:
       Should this actually be `TARGET_PARTITION_SIZE*num_partitions` (I 
believe that's possible), since it's the data limit across every partition?

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):
+    self._target_size = target_size
+
+  def start_bundle(self):
+    self._parts = collections.defaultdict(lambda: 
collections.defaultdict(list))
+    self._running_size = 0
+
+  def process(
+      self,
+      element,
+      window=beam.DoFn.WindowParam,
+      timestamp=beam.DoFn.TimestampParam):
+    _, tagged_parts = element
+    for tag, parts in tagged_parts.items():
+      for part in parts:
+        self._running_size += _total_memory_usage(part)
+      self._parts[window, timestamp][tag].extend(parts)
+    if self._running_size >= self._target_size:
+      self.finish_bundle()
+
+  def finish_bundle(self):
+    for (window, timestamp), tagged_parts in self._parts.items():
+      yield windowed_value.WindowedValue(
+          {tag: pd.concat(parts)

Review comment:
       I'm curious if it's always beneficial to do use `pd.concat` here. I was 
under the impression that it copies and re-arranges buffers into columns




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to