robertwb commented on a change in pull request #13401:
URL: https://github.com/apache/beam/pull/13401#discussion_r528048272
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1401,17 +1403,20 @@ def replace(self, limit, **kwargs):
def reset_index(self, level=None, **kwargs):
if level is not None and not isinstance(level, (tuple, list)):
level = [level]
+
if level is None or len(level) == self._expr.proxy().index.nlevels:
# TODO: Could do distributed re-index with offsets.
requires_partition_by = partitionings.Singleton()
else:
requires_partition_by = partitionings.Nothing()
+
+
Review comment:
Did yapf suggest this?
##########
File path: sdks/python/apache_beam/dataframe/expressions.py
##########
@@ -67,28 +72,57 @@ def is_scalar(expr):
result = super(PartitioningSession, self).evaluate(expr)
else:
scaler_args = [arg for arg in expr.args() if is_scalar(arg)]
- parts = collections.defaultdict(
- lambda: Session({arg: self.evaluate(arg)
- for arg in scaler_args}))
- for arg in expr.args():
- if not is_scalar(arg):
- input = self.evaluate(arg)
- for key, part in expr.requires_partition_by().test_partition_fn(
- input):
- parts[key]._bindings[arg] = part
- if not parts:
- parts[None] # Create at least one entry.
-
- results = []
- for session in parts.values():
- if any(len(session.lookup(arg)) for arg in expr.args()
- if not is_scalar(arg)):
- results.append(session.evaluate(expr))
- if results:
- result = pd.concat(results)
- else:
- # Choose any single session.
- result = next(iter(parts.values())).evaluate(expr)
+
+ def evaluate_with(input_partitioning):
+ parts = collections.defaultdict(
+ lambda: Session({arg: self.evaluate(arg)
+ for arg in scaler_args}))
+ for arg in expr.args():
+ if not is_scalar(arg):
+ input = self.evaluate(arg)
+ for key, part in input_partitioning.test_partition_fn(input):
+ parts[key]._bindings[arg] = part
+ if not parts:
+ parts[None] # Create at least one entry.
+
+ results = []
+ for session in parts.values():
+ if any(len(session.lookup(arg)) for arg in expr.args()
+ if not is_scalar(arg)):
+ results.append(session.evaluate(expr))
+
+ expected_output_partitioning = expr.preserves_partition_by(
+ ) if input_partitioning.is_subpartitioning_of(
+ expr.preserves_partition_by()) else input_partitioning
+
+ if not expected_output_partitioning.check(results):
+ raise AssertionError(
+ f"""Expression does not preserve partitioning!
+ Expression: {expr}
+ Requires: {expr.requires_partition_by()}
+ Preserves: {expr.preserves_partition_by()}
+ Input partitioning: {input_partitioning}
+ Expected output partitioning: {expected_output_partitioning}
+ """)
+
+ if results:
+ return pd.concat(results)
+ else:
+ # Choose any single session.
+ return next(iter(parts.values())).evaluate(expr)
+
+ input_partitioning = expr.requires_partition_by()
+
+ while input_partitioning is not None:
+ result = evaluate_with(input_partitioning)
+
+ if input_partitioning == partitionings.Nothing():
Review comment:
This loop isn't obvious to follow. Perhaps iterate over
set([input_partitioning, Nothing(), Index(), Singleton()]) and continue in the
cases where it's not a subpartition of index_partitioning.
##########
File path: sdks/python/apache_beam/dataframe/partitionings.py
##########
@@ -115,6 +115,23 @@ def partition_fn(self, df, num_partitions):
for key in range(num_partitions):
yield key, df[hashes % num_partitions == key]
+ def check(self, dfs):
Review comment:
This isn't a very strong check if the dataframes are fairly sparse (as
they are in most examples). We could try concat + repartition and verify the
results are the same.
##########
File path: sdks/python/apache_beam/dataframe/expressions.py
##########
@@ -67,28 +72,57 @@ def is_scalar(expr):
result = super(PartitioningSession, self).evaluate(expr)
else:
scaler_args = [arg for arg in expr.args() if is_scalar(arg)]
- parts = collections.defaultdict(
- lambda: Session({arg: self.evaluate(arg)
- for arg in scaler_args}))
- for arg in expr.args():
- if not is_scalar(arg):
- input = self.evaluate(arg)
- for key, part in expr.requires_partition_by().test_partition_fn(
- input):
- parts[key]._bindings[arg] = part
- if not parts:
- parts[None] # Create at least one entry.
-
- results = []
- for session in parts.values():
- if any(len(session.lookup(arg)) for arg in expr.args()
- if not is_scalar(arg)):
- results.append(session.evaluate(expr))
- if results:
- result = pd.concat(results)
- else:
- # Choose any single session.
- result = next(iter(parts.values())).evaluate(expr)
+
+ def evaluate_with(input_partitioning):
+ parts = collections.defaultdict(
+ lambda: Session({arg: self.evaluate(arg)
+ for arg in scaler_args}))
+ for arg in expr.args():
+ if not is_scalar(arg):
+ input = self.evaluate(arg)
+ for key, part in input_partitioning.test_partition_fn(input):
+ parts[key]._bindings[arg] = part
+ if not parts:
+ parts[None] # Create at least one entry.
+
+ results = []
+ for session in parts.values():
+ if any(len(session.lookup(arg)) for arg in expr.args()
+ if not is_scalar(arg)):
+ results.append(session.evaluate(expr))
+
+ expected_output_partitioning = expr.preserves_partition_by(
+ ) if input_partitioning.is_subpartitioning_of(
+ expr.preserves_partition_by()) else input_partitioning
+
+ if not expected_output_partitioning.check(results):
+ raise AssertionError(
+ f"""Expression does not preserve partitioning!
+ Expression: {expr}
+ Requires: {expr.requires_partition_by()}
+ Preserves: {expr.preserves_partition_by()}
+ Input partitioning: {input_partitioning}
+ Expected output partitioning: {expected_output_partitioning}
+ """)
+
+ if results:
+ return pd.concat(results)
+ else:
+ # Choose any single session.
+ return next(iter(parts.values())).evaluate(expr)
+
+ input_partitioning = expr.requires_partition_by()
+
+ while input_partitioning is not None:
+ result = evaluate_with(input_partitioning)
+
+ if input_partitioning == partitionings.Nothing():
+ input_partitioning = partitionings.Index()
+ elif isinstance(input_partitioning, partitionings.Index):
+ input_partitioning = partitionings.Singleton()
+ else: # partitionings.Singleton()
Review comment:
Well, it could be something different...
##########
File path: sdks/python/apache_beam/dataframe/expressions.py
##########
@@ -48,10 +48,15 @@ def lookup(self, expr): # type: (Expression) -> Any
class PartitioningSession(Session):
"""An extension of Session that enforces actual partitioning of inputs.
- When evaluating an expression, inputs are partitioned according to its
- `requires_partition_by` specifications, the expression is evaluated on each
- partition separately, and the final result concatinated, as if this were
- actually executed in a parallel manner.
+ Each expression is evaluated multiple times for various supported
Review comment:
I wonder how expensive this will get. Hopefully not too bad. It could,
however, mess up anything that depends on the random seed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]