This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ed7d0e  [BEAM-9547] Add several simple methods to dataframes. (#13157)
6ed7d0e is described below

commit 6ed7d0eb46cb8eb4af32565d7c6bc98eb0e2c3ea
Author: Robert Bradshaw <[email protected]>
AuthorDate: Thu Oct 22 17:28:59 2020 -0700

    [BEAM-9547] Add several simple methods to dataframes. (#13157)
---
 sdks/python/apache_beam/dataframe/frames.py        | 112 ++++++++++++++-------
 .../apache_beam/dataframe/pandas_doctests_test.py  |  10 +-
 2 files changed, 77 insertions(+), 45 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/frames.py 
b/sdks/python/apache_beam/dataframe/frames.py
index d67c179..c3fc819 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -46,6 +46,33 @@ class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
 
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
+  @frame_base.maybe_inplace
+  def fillna(self, value, method, axis, **kwargs):
+    if method is not None and axis in (0, 'index'):
+      raise frame_base.WontImplementError('order-sensitive')
+    if isinstance(value, frame_base.DeferredBase):
+      value_expr = value._expr
+    else:
+      value_expr = expressions.ConstantExpression(value)
+    return frame_base.DeferredFrame.wrap(
+        # yapf: disable
+        expressions.ComputedExpression(
+            'fillna',
+            lambda df,
+            value: df.fillna(value, method=method, axis=axis, **kwargs),
+            [self._expr, value_expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=partitionings.Nothing()))
+
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def ffill(self, **kwargs):
+    return self.fillna(method='ffill', **kwargs)
+
+  pad = ffill
+
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
   def groupby(self, by, level, axis, as_index, group_keys, **kwargs):
     if not as_index:
       raise NotImplementedError('groupby(as_index=False)')
@@ -141,6 +168,16 @@ class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
             preserves_partition_by=partitionings.Singleton()),
         kwargs)
 
+  abs = frame_base._elementwise_method('abs')
+  astype = frame_base._elementwise_method('astype')
+  copy = frame_base._elementwise_method('copy')
+
+  @property
+  def dtype(self):
+    return self._expr.proxy().dtype
+
+  dtypes = dtype
+
 
 @frame_base.DeferredFrame._register_for(pd.Series)
 class DeferredSeries(DeferredDataFrameOrSeries):
@@ -163,7 +200,7 @@ class DeferredSeries(DeferredDataFrameOrSeries):
             preserves_partition_by=partitionings.Index()))
     return aligned.iloc[:, 0], aligned.iloc[:, 1]
 
-  astype = frame_base._elementwise_method('astype')
+  array = property(frame_base.wont_implement_method('non-deferred value'))
 
   between = frame_base._elementwise_method('between')
 
@@ -346,24 +383,6 @@ class DeferredSeries(DeferredDataFrameOrSeries):
   isna = frame_base._elementwise_method('isna')
   notnull = notna = frame_base._elementwise_method('notna')
 
-  @frame_base.args_to_kwargs(pd.Series)
-  @frame_base.populate_defaults(pd.Series)
-  @frame_base.maybe_inplace
-  def fillna(self, value, method):
-    if method is not None:
-      raise frame_base.WontImplementError('order-sensitive')
-    if isinstance(value, frame_base.DeferredBase):
-      value_expr = value._expr
-    else:
-      value_expr = expressions.ConstantExpression(value)
-    return frame_base.DeferredFrame.wrap(
-        expressions.ComputedExpression(
-            'fillna',
-            lambda df,
-            value: df.fillna(value, method=method), [self._expr, value_expr],
-            preserves_partition_by=partitionings.Singleton(),
-            requires_partition_by=partitionings.Nothing()))
-
   reindex = frame_base.not_implemented_method('reindex')
   rolling = frame_base.not_implemented_method('rolling')
 
@@ -401,6 +420,12 @@ class DeferredSeries(DeferredDataFrameOrSeries):
 
   agg = aggregate
 
+  @property
+  def axes(self):
+    return [self.index]
+
+  clip = frame_base._elementwise_method('clip')
+
   all = frame_base._agg_method('all')
   any = frame_base._agg_method('any')
   min = frame_base._agg_method('min')
@@ -414,6 +439,8 @@ class DeferredSeries(DeferredDataFrameOrSeries):
 
   head = tail = frame_base.wont_implement_method('order-sensitive')
 
+  filter = frame_base._elementwise_method('filter')
+
   memory_usage = frame_base.wont_implement_method('non-deferred value')
 
   # In Series __contains__ checks the index
@@ -464,6 +491,9 @@ class DeferredSeries(DeferredDataFrameOrSeries):
               preserves_partition_by=partitionings.Singleton(),
               requires_partition_by=partitionings.Singleton()))
 
+  plot = frame_base.wont_implement_method('plot')
+  pop = frame_base.wont_implement_method('non-lazy')
+
   rename_axis = frame_base._elementwise_method('rename_axis')
 
   @frame_base.args_to_kwargs(pd.Series)
@@ -610,6 +640,10 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
   def axes(self):
     return (self.index, self.columns)
 
+  @property
+  def dtypes(self):
+    return self._expr.proxy().dtypes
+
   def assign(self, **kwargs):
     for name, value in kwargs.items():
       if not callable(value) and not isinstance(value, DeferredSeries):
@@ -701,6 +735,9 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
   all = frame_base._agg_method('all')
   any = frame_base._agg_method('any')
 
+  clip = frame_base._elementwise_method(
+      'clip', restrictions={'axis': lambda axis: axis in (0, 'index')})
+
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
   def corr(self, method, min_periods):
@@ -842,25 +879,6 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
             preserves_partition_by=partitionings.Singleton(),
             requires_partition_by=requires_partition_by))
 
-  @frame_base.args_to_kwargs(pd.DataFrame)
-  @frame_base.populate_defaults(pd.DataFrame)
-  @frame_base.maybe_inplace
-  def fillna(self, value, method, axis, **kwargs):
-    if method is not None and axis in (0, 'index'):
-      raise frame_base.WontImplementError('order-sensitive')
-    if isinstance(value, frame_base.DeferredBase):
-      value_expr = value._expr
-    else:
-      value_expr = expressions.ConstantExpression(value)
-    return frame_base.DeferredFrame.wrap(
-        expressions.ComputedExpression(
-            'fillna',
-            lambda df, value: df.fillna(
-                value, method=method, axis=axis, **kwargs),
-            [self._expr, value_expr],
-            preserves_partition_by=partitionings.Singleton(),
-            requires_partition_by=partitionings.Nothing()))
-
   isna = frame_base._elementwise_method('isna')
   notnull = notna = frame_base._elementwise_method('notna')
 
@@ -1045,6 +1063,18 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
             preserves_partition_by=partitionings.Singleton(),
             requires_partition_by=requires_partition_by))
 
+  plot = frame_base.wont_implement_method('plot')
+
+  def pop(self, item):
+    result = self[item]
+    self._expr = expressions.ComputedExpression(
+            'popped',
+            lambda df: (df.pop(item), df)[-1],
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=partitionings.Nothing())
+    return result
+
   prod = product = frame_base._agg_method('prod')
 
   @frame_base.args_to_kwargs(pd.DataFrame)
@@ -1202,6 +1232,8 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
       requires_partition_by=partitionings.Index(),
       preserves_partition_by=partitionings.Index())
 
+  values = property(frame_base.wont_implement_method('non-deferred value'))
+
 
 for io_func in dir(io):
   if io_func.startswith('to_'):
@@ -1310,6 +1342,7 @@ class _DeferredGroupByCols(frame_base.DeferredFrame):
   all = frame_base._elementwise_method('all')
   apply = frame_base.not_implemented_method('apply')
   backfill = bfill = frame_base.not_implemented_method('backfill')
+  boxplot = frame_base.wont_implement_method('plot')
   corr = frame_base.not_implemented_method('corr')
   corrwith = frame_base.not_implemented_method('corrwith')
   cov = frame_base.not_implemented_method('cov')
@@ -1615,3 +1648,6 @@ for base in ['add',
 for name in ['__lt__', '__le__', '__gt__', '__ge__', '__eq__', '__ne__']:
   setattr(DeferredSeries, name, frame_base._elementwise_method(name))
   setattr(DeferredDataFrame, name, frame_base._elementwise_method(name))
+
+DeferredSeries.multiply = DeferredSeries.mul  # type: ignore
+DeferredDataFrame.multiply = DeferredDataFrame.mul  # type: ignore
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py 
b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index dcbf7b5..c727054 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -136,7 +136,6 @@ class DoctestTest(unittest.TestCase):
             'pandas.core.frame.DataFrame.duplicated': ['*'],
             'pandas.core.frame.DataFrame.idxmax': ['*'],
             'pandas.core.frame.DataFrame.idxmin': ['*'],
-            'pandas.core.frame.DataFrame.pop': ['*'],
             'pandas.core.frame.DataFrame.rename': [
                 # Returns deferred index.
                 'df.index',
@@ -188,10 +187,6 @@ class DoctestTest(unittest.TestCase):
             ],
             'pandas.core.frame.DataFrame.to_sparse': ['type(df)'],
 
-            # DeferredSeries has no attribute dtype. Should we allow this and
-            # defer to proxy?
-            'pandas.core.frame.DataFrame.iterrows': ["print(df['int'].dtype)"],
-
             # Skipped because "seen_wont_implement" is reset before getting to
             # these calls, so the NameError they raise is not ignored.
             'pandas.core.frame.DataFrame.T': [
@@ -210,6 +205,7 @@ class DoctestTest(unittest.TestCase):
         report=True,
         wont_implement_ok={
             'pandas.core.series.Series.__array__': ['*'],
+            'pandas.core.series.Series.array': ['*'],
             'pandas.core.series.Series.cummax': ['*'],
             'pandas.core.series.Series.cummin': ['*'],
             'pandas.core.series.Series.cumsum': ['*'],
@@ -232,6 +228,7 @@ class DoctestTest(unittest.TestCase):
                 "s.nsmallest(3)",
                 "s.nsmallest(3, keep='last')",
             ],
+            'pandas.core.series.Series.pop': ['*'],
             'pandas.core.series.Series.searchsorted': ['*'],
             'pandas.core.series.Series.shift': ['*'],
             'pandas.core.series.Series.take': ['*'],
@@ -250,7 +247,6 @@ class DoctestTest(unittest.TestCase):
             'pandas.core.series.Series.reindex': ['*'],
         },
         skip={
-            'pandas.core.series.Series.array': ['*'],
             'pandas.core.series.Series.append': ['*'],
             'pandas.core.series.Series.argmax': ['*'],
             'pandas.core.series.Series.argmin': ['*'],
@@ -271,8 +267,8 @@ class DoctestTest(unittest.TestCase):
             'pandas.core.series.Series.idxmin': ['*'],
             'pandas.core.series.Series.name': ['*'],
             'pandas.core.series.Series.nonzero': ['*'],
-            'pandas.core.series.Series.pop': ['*'],
             'pandas.core.series.Series.quantile': ['*'],
+            'pandas.core.series.Series.pop': ['ser'],  # testing side effect
             'pandas.core.series.Series.rename': ['*'],
             'pandas.core.series.Series.repeat': ['*'],
             'pandas.core.series.Series.replace': ['*'],

Reply via email to