robertwb commented on a change in pull request #11974:
URL: https://github.com/apache/beam/pull/11974#discussion_r462445276
##########
File path: sdks/python/apache_beam/dataframe/frame_base.py
##########
@@ -41,16 +42,34 @@ def wrapper(deferred_type):
@classmethod
def wrap(cls, expr):
- return cls._pandas_type_map[type(expr.proxy())](expr)
+ proxy_type = type(expr.proxy())
+ if proxy_type in cls._pandas_type_map:
+ wrapper_type = cls._pandas_type_map[proxy_type]
+ else:
+ if expr.requires_partition_by() != partitionings.Singleton():
+ raise ValueError(
+ 'Scalar expression %s partitoned by non-singleton %s' %
+ (expr, expr.requires_partition_by()))
+ wrapper_type = _DeferredScaler
+ return wrapper_type(expr)
def _elementwise(self, func, name=None, other_args=(), inplace=False):
return _elementwise_function(func, name, inplace=inplace)(self,
*other_args)
+
+class DeferredFrame(DeferredBase):
@property
def dtypes(self):
return self._expr.proxy().dtypes
+class _DeferredScaler(DeferredBase):
Review comment:
Yes, thanks.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -111,19 +164,181 @@ def at(self, *args, **kwargs):
def loc(self):
return _DeferredLoc(self)
- def aggregate(self, func, axis=0, *args, **kwargs):
- if axis != 0:
- raise NotImplementedError()
+ def aggregate(self, *args, **kwargs):
+ if 'axis' in kwargs and kwargs['axis'] is None:
+ return self.agg(*args, **dict(kwargs, axis=1)).agg(
+ *args, **dict(kwargs, axis=0))
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
- lambda df: df.agg(func, axis, *args, **kwargs),
+ lambda df: df.agg(*args, **kwargs),
[self._expr],
# TODO(robertwb): Sub-aggregate when possible.
requires_partition_by=partitionings.Singleton()))
agg = aggregate
+ applymap = frame_base._elementwise_method('applymap')
+
+ def memory_usage(self):
+ raise frame_base.WontImplementError()
+
+ all = frame_base._associative_agg_method('all')
+ any = frame_base._associative_agg_method('any')
+
+ cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
+ 'order-sensitive')
+ diff = frame_base.wont_implement_method('order-sensitive')
+
+ max = frame_base._associative_agg_method('max')
+ min = frame_base._associative_agg_method('min')
+ mode = frame_base._agg_method('mode')
+
+ def dropna(self, axis=0, how='any', thresh=None, subset=None, inplace=False,
*args, **kwargs):
+ # TODO(robertwb): This is a common pattern. Generalize?
+ if axis == 1:
+ requires_partition_by = partitionings.Singleton()
+ else:
+ requires_partition_by = partitionings.Nothing()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'dropna',
+ lambda df: df.dropna(axis, how, thresh, subset, False, *args,
**kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
+ 'non-lazy')
+
+ isna = frame_base._elementwise_method('isna')
+ notnull = notna = frame_base._elementwise_method('notna')
+
+ prod = product = frame_base._associative_agg_method('prod')
+
+ def quantile(self, q=0.5, axis=0, *args, **kwargs):
+ if axis != 0:
+ raise frame_base.WontImplementError('non-deferred column values')
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'quantile',
+ lambda df: df.quantile(q, axis, *args, **kwargs),
+ [self._expr],
+ #TODO(robertwb): Approximate quantiles?
+ requires_partition_by=partitionings.Singleton(),
+ preserves_partition_by=partitionings.Singleton()))
+
+ query = frame_base._elementwise_method('query')
+
+ def replace(self, to_replace=None,
+ value=None,
+ inplace=False,
+ limit=None, *args, **kwargs):
+ if limit is None:
+ requires_partition_by = partitionings.Nothing()
+ else:
+ requires_partition_by = partitionings.Singleton()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'replace',
+ lambda df: df.replace(to_replace, value, False, limit, *args,
**kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ def reset_index(self, level=None, drop=False, inplace=False, *args,
**kwargs):
+ if level is not None and not isinstance(level, (tuple, list)):
+ level = [level]
+ if level is None or len(level) == len(self._expr.proxy().index.levels):
+ # TODO: Could do distributed re-index with offsets.
+ requires_partition_by = partitionings.Singleton()
+ else:
+ requires_partition_by = partitionings.Nothing()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'reset_index',
+ lambda df: df.reset_index(level, drop, False, *args, **kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ round = frame_base._elementwise_method('round')
+ select_dtypes = frame_base._elementwise_method('select_dtypes')
+
+ def shift(self, periods=1, freq=None, axis=0, *args, **kwargs):
+ if axis == 1:
Review comment:
Fixed several occurrences.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -111,19 +164,181 @@ def at(self, *args, **kwargs):
def loc(self):
return _DeferredLoc(self)
- def aggregate(self, func, axis=0, *args, **kwargs):
- if axis != 0:
- raise NotImplementedError()
+ def aggregate(self, *args, **kwargs):
+ if 'axis' in kwargs and kwargs['axis'] is None:
+ return self.agg(*args, **dict(kwargs, axis=1)).agg(
+ *args, **dict(kwargs, axis=0))
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
- lambda df: df.agg(func, axis, *args, **kwargs),
+ lambda df: df.agg(*args, **kwargs),
[self._expr],
# TODO(robertwb): Sub-aggregate when possible.
requires_partition_by=partitionings.Singleton()))
agg = aggregate
+ applymap = frame_base._elementwise_method('applymap')
+
+ def memory_usage(self):
+ raise frame_base.WontImplementError()
Review comment:
Yeah. Done.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -111,19 +164,181 @@ def at(self, *args, **kwargs):
def loc(self):
return _DeferredLoc(self)
- def aggregate(self, func, axis=0, *args, **kwargs):
- if axis != 0:
- raise NotImplementedError()
+ def aggregate(self, *args, **kwargs):
+ if 'axis' in kwargs and kwargs['axis'] is None:
+ return self.agg(*args, **dict(kwargs, axis=1)).agg(
+ *args, **dict(kwargs, axis=0))
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
- lambda df: df.agg(func, axis, *args, **kwargs),
+ lambda df: df.agg(*args, **kwargs),
[self._expr],
# TODO(robertwb): Sub-aggregate when possible.
requires_partition_by=partitionings.Singleton()))
agg = aggregate
+ applymap = frame_base._elementwise_method('applymap')
+
+ def memory_usage(self):
+ raise frame_base.WontImplementError()
+
+ all = frame_base._associative_agg_method('all')
+ any = frame_base._associative_agg_method('any')
+
+ cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
+ 'order-sensitive')
+ diff = frame_base.wont_implement_method('order-sensitive')
+
+ max = frame_base._associative_agg_method('max')
+ min = frame_base._associative_agg_method('min')
+ mode = frame_base._agg_method('mode')
+
+ def dropna(self, axis=0, how='any', thresh=None, subset=None, inplace=False,
*args, **kwargs):
+ # TODO(robertwb): This is a common pattern. Generalize?
+ if axis == 1:
+ requires_partition_by = partitionings.Singleton()
+ else:
+ requires_partition_by = partitionings.Nothing()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'dropna',
+ lambda df: df.dropna(axis, how, thresh, subset, False, *args,
**kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
+ 'non-lazy')
+
+ isna = frame_base._elementwise_method('isna')
+ notnull = notna = frame_base._elementwise_method('notna')
+
+ prod = product = frame_base._associative_agg_method('prod')
+
+ def quantile(self, q=0.5, axis=0, *args, **kwargs):
+ if axis != 0:
+ raise frame_base.WontImplementError('non-deferred column values')
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'quantile',
+ lambda df: df.quantile(q, axis, *args, **kwargs),
+ [self._expr],
+ #TODO(robertwb): Approximate quantiles?
+ requires_partition_by=partitionings.Singleton(),
+ preserves_partition_by=partitionings.Singleton()))
+
+ query = frame_base._elementwise_method('query')
+
+ def replace(self, to_replace=None,
+ value=None,
+ inplace=False,
+ limit=None, *args, **kwargs):
+ if limit is None:
+ requires_partition_by = partitionings.Nothing()
+ else:
+ requires_partition_by = partitionings.Singleton()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'replace',
+ lambda df: df.replace(to_replace, value, False, limit, *args,
**kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ def reset_index(self, level=None, drop=False, inplace=False, *args,
**kwargs):
+ if level is not None and not isinstance(level, (tuple, list)):
+ level = [level]
+ if level is None or len(level) == len(self._expr.proxy().index.levels):
+ # TODO: Could do distributed re-index with offsets.
+ requires_partition_by = partitionings.Singleton()
+ else:
+ requires_partition_by = partitionings.Nothing()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'reset_index',
+ lambda df: df.reset_index(level, drop, False, *args, **kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ round = frame_base._elementwise_method('round')
+ select_dtypes = frame_base._elementwise_method('select_dtypes')
+
+ def shift(self, periods=1, freq=None, axis=0, *args, **kwargs):
+ if axis == 1:
+ requires_partition_by = partitionings.Nothing()
+ else:
+ requires_partition_by = partitionings.Singleton()
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'shift',
+ lambda df: df.shift(periods, freq, axis, *args, **kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+
+ @property
+ def shape(self):
+ raise frame_base.WontImplementError('scalar value')
+
+ def sort_values(self, by, axis=0, ascending=True, inplace=False, *args,
**kwargs):
+ if axis == 1:
+ requires_partition_by = partitionings.Nothing()
+ else:
+ requires_partition_by = partitionings.Singleton()
Review comment:
Yes, I was thinking of using context managers to allow/disallow this.
(Future PR.)
##########
File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py
##########
@@ -34,77 +34,54 @@ def test_dataframe_tests(self):
'pandas.core.frame.DataFrame.T': ['*'],
'pandas.core.frame.DataFrame.agg': ['*'],
'pandas.core.frame.DataFrame.aggregate': ['*'],
- 'pandas.core.frame.DataFrame.all': ['*'],
- 'pandas.core.frame.DataFrame.any': ['*'],
'pandas.core.frame.DataFrame.append': ['*'],
'pandas.core.frame.DataFrame.apply': ['*'],
- 'pandas.core.frame.DataFrame.applymap': ['*'],
+ 'pandas.core.frame.DataFrame.applymap': ['df ** 2'],
'pandas.core.frame.DataFrame.assign': ['*'],
'pandas.core.frame.DataFrame.axes': ['*'],
'pandas.core.frame.DataFrame.combine': ['*'],
'pandas.core.frame.DataFrame.combine_first': ['*'],
'pandas.core.frame.DataFrame.corr': ['*'],
'pandas.core.frame.DataFrame.count': ['*'],
'pandas.core.frame.DataFrame.cov': ['*'],
- 'pandas.core.frame.DataFrame.cummax': ['*'],
- 'pandas.core.frame.DataFrame.cummin': ['*'],
- 'pandas.core.frame.DataFrame.cumprod': ['*'],
- 'pandas.core.frame.DataFrame.cumsum': ['*'],
- 'pandas.core.frame.DataFrame.diff': ['*'],
'pandas.core.frame.DataFrame.dot': ['*'],
'pandas.core.frame.DataFrame.drop': ['*'],
- 'pandas.core.frame.DataFrame.dropna': ['*'],
'pandas.core.frame.DataFrame.eval': ['*'],
'pandas.core.frame.DataFrame.explode': ['*'],
'pandas.core.frame.DataFrame.fillna': ['*'],
'pandas.core.frame.DataFrame.info': ['*'],
'pandas.core.frame.DataFrame.isin': ['*'],
- 'pandas.core.frame.DataFrame.isna': ['*'],
- 'pandas.core.frame.DataFrame.isnull': ['*'],
- 'pandas.core.frame.DataFrame.items': ['*'],
- 'pandas.core.frame.DataFrame.iteritems': ['*'],
- 'pandas.core.frame.DataFrame.iterrows': ['*'],
- 'pandas.core.frame.DataFrame.itertuples': ['*'],
+ 'pandas.core.frame.DataFrame.iterrows': ["print(df['int'].dtype)"],
'pandas.core.frame.DataFrame.join': ['*'],
- 'pandas.core.frame.DataFrame.max': ['*'],
'pandas.core.frame.DataFrame.melt': ['*'],
'pandas.core.frame.DataFrame.memory_usage': ['*'],
'pandas.core.frame.DataFrame.merge': ['*'],
- 'pandas.core.frame.DataFrame.min': ['*'],
- 'pandas.core.frame.DataFrame.mode': ['*'],
+ # Not equal to df.agg('mode', axis='columns', numeric_only=True)
+ 'pandas.core.frame.DataFrame.mode': [
+ "df.mode(axis='columns', numeric_only=True)"
+ ],
'pandas.core.frame.DataFrame.nlargest': ['*'],
- 'pandas.core.frame.DataFrame.notna': ['*'],
- 'pandas.core.frame.DataFrame.notnull': ['*'],
'pandas.core.frame.DataFrame.nsmallest': ['*'],
'pandas.core.frame.DataFrame.nunique': ['*'],
'pandas.core.frame.DataFrame.pivot': ['*'],
'pandas.core.frame.DataFrame.pivot_table': ['*'],
- 'pandas.core.frame.DataFrame.prod': ['*'],
- 'pandas.core.frame.DataFrame.product': ['*'],
- 'pandas.core.frame.DataFrame.quantile': ['*'],
'pandas.core.frame.DataFrame.query': ['*'],
'pandas.core.frame.DataFrame.reindex': ['*'],
'pandas.core.frame.DataFrame.reindex_axis': ['*'],
'pandas.core.frame.DataFrame.rename': ['*'],
- 'pandas.core.frame.DataFrame.replace': ['*'],
- 'pandas.core.frame.DataFrame.reset_index': ['*'],
+ # Raises right exception, but testing framework has matching
issues.
+ 'pandas.core.frame.DataFrame.replace': [
+ "df.replace({'a string': 'new value', True: False}) # raises"
+ ],
+ # Uses unseeded np.random.
'pandas.core.frame.DataFrame.round': ['*'],
- 'pandas.core.frame.DataFrame.select_dtypes': ['*'],
'pandas.core.frame.DataFrame.set_index': ['*'],
- 'pandas.core.frame.DataFrame.shape': ['*'],
- 'pandas.core.frame.DataFrame.shift': ['*'],
- 'pandas.core.frame.DataFrame.sort_values': ['*'],
- 'pandas.core.frame.DataFrame.stack': ['*'],
- 'pandas.core.frame.DataFrame.sum': ['*'],
- 'pandas.core.frame.DataFrame.to_dict': ['*'],
- 'pandas.core.frame.DataFrame.to_numpy': ['*'],
+ 'pandas.core.frame.DataFrame.transpose': [
+ 'df1_transposed.dtypes', 'df2_transposed.dtypes'
+ ],
+ 'pandas.core.frame.DataFrame.to_sparse': ['type(df)'],
+ # Uses df.index
Review comment:
Good idea. (One unfortunate thing is that each of these actually
implements multiple tests, but that just means the declarations might be overly
broad.)
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -111,19 +164,181 @@ def at(self, *args, **kwargs):
def loc(self):
return _DeferredLoc(self)
- def aggregate(self, func, axis=0, *args, **kwargs):
- if axis != 0:
- raise NotImplementedError()
+ def aggregate(self, *args, **kwargs):
+ if 'axis' in kwargs and kwargs['axis'] is None:
+ return self.agg(*args, **dict(kwargs, axis=1)).agg(
+ *args, **dict(kwargs, axis=0))
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
- lambda df: df.agg(func, axis, *args, **kwargs),
+ lambda df: df.agg(*args, **kwargs),
[self._expr],
# TODO(robertwb): Sub-aggregate when possible.
requires_partition_by=partitionings.Singleton()))
agg = aggregate
+ applymap = frame_base._elementwise_method('applymap')
+
+ def memory_usage(self):
+ raise frame_base.WontImplementError()
+
+ all = frame_base._associative_agg_method('all')
+ any = frame_base._associative_agg_method('any')
+
+ cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
+ 'order-sensitive')
+ diff = frame_base.wont_implement_method('order-sensitive')
+
+ max = frame_base._associative_agg_method('max')
+ min = frame_base._associative_agg_method('min')
+ mode = frame_base._agg_method('mode')
+
+ def dropna(self, axis=0, how='any', thresh=None, subset=None, inplace=False,
*args, **kwargs):
+ # TODO(robertwb): This is a common pattern. Generalize?
+ if axis == 1:
+ requires_partition_by = partitionings.Singleton()
+ else:
+ requires_partition_by = partitionings.Nothing()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'dropna',
+ lambda df: df.dropna(axis, how, thresh, subset, False, *args,
**kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
+ 'non-lazy')
+
+ isna = frame_base._elementwise_method('isna')
+ notnull = notna = frame_base._elementwise_method('notna')
+
+ prod = product = frame_base._associative_agg_method('prod')
+
+ def quantile(self, q=0.5, axis=0, *args, **kwargs):
+ if axis != 0:
+ raise frame_base.WontImplementError('non-deferred column values')
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'quantile',
+ lambda df: df.quantile(q, axis, *args, **kwargs),
+ [self._expr],
+ #TODO(robertwb): Approximate quantiles?
+ requires_partition_by=partitionings.Singleton(),
+ preserves_partition_by=partitionings.Singleton()))
+
+ query = frame_base._elementwise_method('query')
+
+ def replace(self, to_replace=None,
+ value=None,
+ inplace=False,
+ limit=None, *args, **kwargs):
+ if limit is None:
+ requires_partition_by = partitionings.Nothing()
+ else:
+ requires_partition_by = partitionings.Singleton()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'replace',
+ lambda df: df.replace(to_replace, value, False, limit, *args,
**kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ def reset_index(self, level=None, drop=False, inplace=False, *args,
**kwargs):
+ if level is not None and not isinstance(level, (tuple, list)):
+ level = [level]
+ if level is None or len(level) == len(self._expr.proxy().index.levels):
+ # TODO: Could do distributed re-index with offsets.
+ requires_partition_by = partitionings.Singleton()
+ else:
+ requires_partition_by = partitionings.Nothing()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'reset_index',
+ lambda df: df.reset_index(level, drop, False, *args, **kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ round = frame_base._elementwise_method('round')
+ select_dtypes = frame_base._elementwise_method('select_dtypes')
+
+ def shift(self, periods=1, freq=None, axis=0, *args, **kwargs):
+ if axis == 1:
+ requires_partition_by = partitionings.Nothing()
+ else:
+ requires_partition_by = partitionings.Singleton()
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'shift',
+ lambda df: df.shift(periods, freq, axis, *args, **kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+
+ @property
+ def shape(self):
+ raise frame_base.WontImplementError('scalar value')
+
+ def sort_values(self, by, axis=0, ascending=True, inplace=False, *args,
**kwargs):
+ if axis == 1:
+ requires_partition_by = partitionings.Nothing()
+ else:
+ requires_partition_by = partitionings.Singleton()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'sort_values',
+ lambda df: df.sort_values(by, axis, ascending, False, *args,
**kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ stack = frame_base._elementwise_method('stack')
+
+ sum = frame_base._associative_agg_method('sum')
+
+ def to_string(self, *args, **kwargs):
+ raise frame_base.WontImplementError('non-deferred value')
+
+ to_records = to_dict = to_numpy = to_string
Review comment:
Agreed. (I added this helper later.)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]