This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6ed7d0e [BEAM-9547] Add several simple methods to dataframes. (#13157)
6ed7d0e is described below
commit 6ed7d0eb46cb8eb4af32565d7c6bc98eb0e2c3ea
Author: Robert Bradshaw <[email protected]>
AuthorDate: Thu Oct 22 17:28:59 2020 -0700
[BEAM-9547] Add several simple methods to dataframes. (#13157)
---
sdks/python/apache_beam/dataframe/frames.py | 112 ++++++++++++++-------
.../apache_beam/dataframe/pandas_doctests_test.py | 10 +-
2 files changed, 77 insertions(+), 45 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/frames.py
b/sdks/python/apache_beam/dataframe/frames.py
index d67c179..c3fc819 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -46,6 +46,33 @@ class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
+ @frame_base.maybe_inplace
+ def fillna(self, value, method, axis, **kwargs):
+ if method is not None and axis in (0, 'index'):
+ raise frame_base.WontImplementError('order-sensitive')
+ if isinstance(value, frame_base.DeferredBase):
+ value_expr = value._expr
+ else:
+ value_expr = expressions.ConstantExpression(value)
+ return frame_base.DeferredFrame.wrap(
+ # yapf: disable
+ expressions.ComputedExpression(
+ 'fillna',
+ lambda df,
+ value: df.fillna(value, method=method, axis=axis, **kwargs),
+ [self._expr, value_expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=partitionings.Nothing()))
+
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ def ffill(self, **kwargs):
+ return self.fillna(method='ffill', **kwargs)
+
+ pad = ffill
+
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
def groupby(self, by, level, axis, as_index, group_keys, **kwargs):
if not as_index:
raise NotImplementedError('groupby(as_index=False)')
@@ -141,6 +168,16 @@ class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
preserves_partition_by=partitionings.Singleton()),
kwargs)
+ abs = frame_base._elementwise_method('abs')
+ astype = frame_base._elementwise_method('astype')
+ copy = frame_base._elementwise_method('copy')
+
+ @property
+ def dtype(self):
+ return self._expr.proxy().dtype
+
+ dtypes = dtype
+
@frame_base.DeferredFrame._register_for(pd.Series)
class DeferredSeries(DeferredDataFrameOrSeries):
@@ -163,7 +200,7 @@ class DeferredSeries(DeferredDataFrameOrSeries):
preserves_partition_by=partitionings.Index()))
return aligned.iloc[:, 0], aligned.iloc[:, 1]
- astype = frame_base._elementwise_method('astype')
+ array = property(frame_base.wont_implement_method('non-deferred value'))
between = frame_base._elementwise_method('between')
@@ -346,24 +383,6 @@ class DeferredSeries(DeferredDataFrameOrSeries):
isna = frame_base._elementwise_method('isna')
notnull = notna = frame_base._elementwise_method('notna')
- @frame_base.args_to_kwargs(pd.Series)
- @frame_base.populate_defaults(pd.Series)
- @frame_base.maybe_inplace
- def fillna(self, value, method):
- if method is not None:
- raise frame_base.WontImplementError('order-sensitive')
- if isinstance(value, frame_base.DeferredBase):
- value_expr = value._expr
- else:
- value_expr = expressions.ConstantExpression(value)
- return frame_base.DeferredFrame.wrap(
- expressions.ComputedExpression(
- 'fillna',
- lambda df,
- value: df.fillna(value, method=method), [self._expr, value_expr],
- preserves_partition_by=partitionings.Singleton(),
- requires_partition_by=partitionings.Nothing()))
-
reindex = frame_base.not_implemented_method('reindex')
rolling = frame_base.not_implemented_method('rolling')
@@ -401,6 +420,12 @@ class DeferredSeries(DeferredDataFrameOrSeries):
agg = aggregate
+ @property
+ def axes(self):
+ return [self.index]
+
+ clip = frame_base._elementwise_method('clip')
+
all = frame_base._agg_method('all')
any = frame_base._agg_method('any')
min = frame_base._agg_method('min')
@@ -414,6 +439,8 @@ class DeferredSeries(DeferredDataFrameOrSeries):
head = tail = frame_base.wont_implement_method('order-sensitive')
+ filter = frame_base._elementwise_method('filter')
+
memory_usage = frame_base.wont_implement_method('non-deferred value')
# In Series __contains__ checks the index
@@ -464,6 +491,9 @@ class DeferredSeries(DeferredDataFrameOrSeries):
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
+ plot = frame_base.wont_implement_method('plot')
+ pop = frame_base.wont_implement_method('non-lazy')
+
rename_axis = frame_base._elementwise_method('rename_axis')
@frame_base.args_to_kwargs(pd.Series)
@@ -610,6 +640,10 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
def axes(self):
return (self.index, self.columns)
+ @property
+ def dtypes(self):
+ return self._expr.proxy().dtypes
+
def assign(self, **kwargs):
for name, value in kwargs.items():
if not callable(value) and not isinstance(value, DeferredSeries):
@@ -701,6 +735,9 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
all = frame_base._agg_method('all')
any = frame_base._agg_method('any')
+ clip = frame_base._elementwise_method(
+ 'clip', restrictions={'axis': lambda axis: axis in (0, 'index')})
+
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def corr(self, method, min_periods):
@@ -842,25 +879,6 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
- @frame_base.args_to_kwargs(pd.DataFrame)
- @frame_base.populate_defaults(pd.DataFrame)
- @frame_base.maybe_inplace
- def fillna(self, value, method, axis, **kwargs):
- if method is not None and axis in (0, 'index'):
- raise frame_base.WontImplementError('order-sensitive')
- if isinstance(value, frame_base.DeferredBase):
- value_expr = value._expr
- else:
- value_expr = expressions.ConstantExpression(value)
- return frame_base.DeferredFrame.wrap(
- expressions.ComputedExpression(
- 'fillna',
- lambda df, value: df.fillna(
- value, method=method, axis=axis, **kwargs),
- [self._expr, value_expr],
- preserves_partition_by=partitionings.Singleton(),
- requires_partition_by=partitionings.Nothing()))
-
isna = frame_base._elementwise_method('isna')
notnull = notna = frame_base._elementwise_method('notna')
@@ -1045,6 +1063,18 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
+ plot = frame_base.wont_implement_method('plot')
+
+ def pop(self, item):
+ result = self[item]
+ self._expr = expressions.ComputedExpression(
+ 'popped',
+ lambda df: (df.pop(item), df)[-1],
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=partitionings.Nothing())
+ return result
+
prod = product = frame_base._agg_method('prod')
@frame_base.args_to_kwargs(pd.DataFrame)
@@ -1202,6 +1232,8 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
requires_partition_by=partitionings.Index(),
preserves_partition_by=partitionings.Index())
+ values = property(frame_base.wont_implement_method('non-deferred value'))
+
for io_func in dir(io):
if io_func.startswith('to_'):
@@ -1310,6 +1342,7 @@ class _DeferredGroupByCols(frame_base.DeferredFrame):
all = frame_base._elementwise_method('all')
apply = frame_base.not_implemented_method('apply')
backfill = bfill = frame_base.not_implemented_method('backfill')
+ boxplot = frame_base.wont_implement_method('plot')
corr = frame_base.not_implemented_method('corr')
corrwith = frame_base.not_implemented_method('corrwith')
cov = frame_base.not_implemented_method('cov')
@@ -1615,3 +1648,6 @@ for base in ['add',
for name in ['__lt__', '__le__', '__gt__', '__ge__', '__eq__', '__ne__']:
setattr(DeferredSeries, name, frame_base._elementwise_method(name))
setattr(DeferredDataFrame, name, frame_base._elementwise_method(name))
+
+DeferredSeries.multiply = DeferredSeries.mul # type: ignore
+DeferredDataFrame.multiply = DeferredDataFrame.mul # type: ignore
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index dcbf7b5..c727054 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -136,7 +136,6 @@ class DoctestTest(unittest.TestCase):
'pandas.core.frame.DataFrame.duplicated': ['*'],
'pandas.core.frame.DataFrame.idxmax': ['*'],
'pandas.core.frame.DataFrame.idxmin': ['*'],
- 'pandas.core.frame.DataFrame.pop': ['*'],
'pandas.core.frame.DataFrame.rename': [
# Returns deferred index.
'df.index',
@@ -188,10 +187,6 @@ class DoctestTest(unittest.TestCase):
],
'pandas.core.frame.DataFrame.to_sparse': ['type(df)'],
- # DeferredSeries has no attribute dtype. Should we allow this and
- # defer to proxy?
- 'pandas.core.frame.DataFrame.iterrows': ["print(df['int'].dtype)"],
-
# Skipped because "seen_wont_implement" is reset before getting to
# these calls, so the NameError they raise is not ignored.
'pandas.core.frame.DataFrame.T': [
@@ -210,6 +205,7 @@ class DoctestTest(unittest.TestCase):
report=True,
wont_implement_ok={
'pandas.core.series.Series.__array__': ['*'],
+ 'pandas.core.series.Series.array': ['*'],
'pandas.core.series.Series.cummax': ['*'],
'pandas.core.series.Series.cummin': ['*'],
'pandas.core.series.Series.cumsum': ['*'],
@@ -232,6 +228,7 @@ class DoctestTest(unittest.TestCase):
"s.nsmallest(3)",
"s.nsmallest(3, keep='last')",
],
+ 'pandas.core.series.Series.pop': ['*'],
'pandas.core.series.Series.searchsorted': ['*'],
'pandas.core.series.Series.shift': ['*'],
'pandas.core.series.Series.take': ['*'],
@@ -250,7 +247,6 @@ class DoctestTest(unittest.TestCase):
'pandas.core.series.Series.reindex': ['*'],
},
skip={
- 'pandas.core.series.Series.array': ['*'],
'pandas.core.series.Series.append': ['*'],
'pandas.core.series.Series.argmax': ['*'],
'pandas.core.series.Series.argmin': ['*'],
@@ -271,8 +267,8 @@ class DoctestTest(unittest.TestCase):
'pandas.core.series.Series.idxmin': ['*'],
'pandas.core.series.Series.name': ['*'],
'pandas.core.series.Series.nonzero': ['*'],
- 'pandas.core.series.Series.pop': ['*'],
'pandas.core.series.Series.quantile': ['*'],
+ 'pandas.core.series.Series.pop': ['ser'], # testing side effect
'pandas.core.series.Series.rename': ['*'],
'pandas.core.series.Series.repeat': ['*'],
'pandas.core.series.Series.replace': ['*'],