TheNeuralBit commented on a change in pull request #14863:
URL: https://github.com/apache/beam/pull/14863#discussion_r638122103
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1871,12 +1871,15 @@ def clip(self, axis, **kwargs):
"""lower and upper must be :class:`DeferredSeries` instances, or constants.
Review comment:
Done, see comment above
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1871,12 +1871,15 @@ def clip(self, axis, **kwargs):
"""lower and upper must be :class:`DeferredSeries` instances, or constants.
array-like arguments are not supported as they are order-sensitive."""
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -173,10 +180,13 @@ def droplevel(self, level, axis):
preserves_partition_by=partitionings.Arbitrary()
if axis in (1, 'column') else partitionings.Singleton()))
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def fillna(self, value, method, axis, limit, **kwargs):
+ """When axis="index", both method and limit must be None,
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -367,9 +384,14 @@ def map_index(df):
'astype', base=pd.core.generic.NDFrame)
copy = frame_base._elementwise_method('copy', base=pd.core.generic.NDFrame)
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def tz_localize(self, ambiguous, **kwargs):
+ """``ambiguous`` may not be set to 'infer' as it's semantics are
Review comment:
Shoot! This is one I try to always think about. Fixed
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1290,12 +1380,18 @@ def replace(self, to_replace, value, limit, method,
**kwargs):
to_frame = frame_base._elementwise_method('to_frame', base=pd.Series)
+ @frame_base.with_docs_from(pd.Series)
def unique(self, as_series=False):
+ """unique() is not supported by default because it produces a
+ non-deferred result, a numpy array. You may use the Beam-specific argument
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1592,10 +1690,16 @@ def func_elementwise(df):
self._expr = inserted._expr
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def duplicated(self, keep, subset):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
Done.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -993,27 +1049,42 @@ def _wrap_in_df(self):
preserves_partition_by=partitionings.Arbitrary(),
))
+ @frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def duplicated(self, keep):
+ """Only keep=False and keep="any" are supported, other values of keep make
+ this an order-sensitive operation. Note keep="any" is a Beam-specific
+ option that guarantees only one duplicate will be kept, but unlike "first"
+ and "last" it makes no guarantees about _which_ duplicate element is
+ kept."""
# Re-use the DataFrame based duplcated, extract the series back out
df = self._wrap_in_df()
return df.duplicated(keep=keep)[df.columns[0]]
+ @frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def drop_duplicates(self, keep):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -14,6 +14,31 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+"""Sources and sinks for the Beam DataFrame API.
+
+Sources
+#######
+This module provides analogs for pandas ``read`` methods, like
+:func:`pandas.read_csv`. However Beam sources like :func:`read_csv`
+create a Beam :class:`~apache_beam.PTransform`, and return a
+:class:`~apache_beam.dataframe.frames.DeferredDataFrame` or
+:class:`~apache_beam.dataframe.frames.DeferredSeries` representing the contents
+of the referenced file(s) or data source.
+
+The result of these methods must be applied to a :class:`~apache_beam.Pipeline`
+object, for example::
+
+ df = p | beam.dataframe.io.read_csv(...)
+
+Sinks
+#####
+This module also defines analogs for pandas sink, or ``to``, methods that
+generate a Beam :class:`~apache_beam.PTransform`. Generally these should be
Review comment:
Good point, I made this more explicit - "Users should prefer"
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -115,10 +115,14 @@ class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
__array__ = frame_base.wont_implement_method(
pd.Series, '__array__', reason="non-deferred-result")
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def drop(self, labels, axis, index, columns, errors, **kwargs):
+ """drop is not parallelizable when dropping from the index and
+ errors="raise" specified. It requires collecting all data on a single node
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -92,6 +92,24 @@ def wrapper(self, *args, **kwargs):
frame_base.populate_defaults(pd.DataFrame)(wrapper)))
+LIFTABLE_AGGREGATIONS = ['all', 'any', 'max', 'min', 'prod', 'sum']
+LIFTABLE_WITH_SUM_AGGREGATIONS = ['size', 'count']
+UNLIFTABLE_AGGREGATIONS = ['mean', 'median', 'std', 'var']
+
+
+def _agg_method(base, func):
+ def wrapper(self, *args, **kwargs):
+ return self.agg(func, *args, **kwargs)
+
+ if func in UNLIFTABLE_AGGREGATIONS:
+ wrapper.__doc__ = (
+ f"``{func}`` cannot currently be parallelized, it will "
Review comment:
Fixed this here and everywhere else that I saw this pattern.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -367,9 +384,14 @@ def map_index(df):
'astype', base=pd.core.generic.NDFrame)
copy = frame_base._elementwise_method('copy', base=pd.core.generic.NDFrame)
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def tz_localize(self, ambiguous, **kwargs):
+ """``ambiguous`` may not be set to 'infer' as it's semantics are
Review comment:
Ah thanks for the explanation, this makes sense. Fixed.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -173,10 +180,13 @@ def droplevel(self, level, axis):
preserves_partition_by=partitionings.Arbitrary()
if axis in (1, 'column') else partitionings.Singleton()))
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def fillna(self, value, method, axis, limit, **kwargs):
+ """When axis="index", both method and limit must be None,
+ otherwise this operation is order-sensitive."""
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -993,27 +1049,42 @@ def _wrap_in_df(self):
preserves_partition_by=partitionings.Arbitrary(),
))
+ @frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def duplicated(self, keep):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -367,9 +384,14 @@ def map_index(df):
'astype', base=pd.core.generic.NDFrame)
copy = frame_base._elementwise_method('copy', base=pd.core.generic.NDFrame)
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def tz_localize(self, ambiguous, **kwargs):
+ """``ambiguous`` may not be set to 'infer' as it's semantics are
+ order-sensitive. Similarly specifying ``ambiguous`` as an ndarray is
+ order-sensitive, but you can achieve similar functionality by specifying
+ ambiguous as a Series."""
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1163,9 +1235,15 @@ def axes(self):
__contains__ = frame_base.wont_implement_method(
pd.Series, '__contains__', reason="non-deferred-result")
+ @frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def nlargest(self, keep, **kwargs):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -993,27 +1049,42 @@ def _wrap_in_df(self):
preserves_partition_by=partitionings.Arbitrary(),
))
+ @frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def duplicated(self, keep):
+ """Only keep=False and keep="any" are supported, other values of keep make
+ this an order-sensitive operation. Note keep="any" is a Beam-specific
+ option that guarantees only one duplicate will be kept, but unlike "first"
Review comment:
Yeah I was on the fence about this. But I think the argument against
backticks for me was really just laziness.
It makes sense to have a policy to use backticks in the docstrings for
anything that's referring to an argument or a python literal (so for string
literals we'll use backticks _and_ quotes). That will make it clear in the
rendered documentation that these things are references to code, not part of
the language. I went ahead and updated all the new docstrings to use
double-backticks in this way.
I'm not sure what to do about error messages. I don't think it's worth the
trouble to use backticks there since users should always see it rendered as
plaintext.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1190,9 +1268,15 @@ def nlargest(self, keep, **kwargs):
preserves_partition_by=partitionings.Arbitrary(),
requires_partition_by=partitionings.Singleton()))
+ @frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def nsmallest(self, keep, **kwargs):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1248,10 +1333,15 @@ def set_index(s):
rename_axis = frame_base._elementwise_method('rename_axis', base=pd.Series)
+ @frame_base.with_docs_from(pd.Series, name='is_unique')
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def replace(self, to_replace, value, limit, method, **kwargs):
+ """`method` is not supported in the Beam DataFrame API because it is
+ order-sensitive, it must not be specified.
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1614,10 +1718,16 @@ def duplicated(self, keep, subset):
lambda df: pd.DataFrame(df.duplicated(keep=keep, subset=subset),
columns=[None]))[None]
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def drop_duplicates(self, keep, subset, ignore_index):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -2166,9 +2279,15 @@ def merge(
return merged.reset_index(drop=True)
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def nlargest(self, keep, **kwargs):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1290,12 +1380,18 @@ def replace(self, to_replace, value, limit, method,
**kwargs):
to_frame = frame_base._elementwise_method('to_frame', base=pd.Series)
+ @frame_base.with_docs_from(pd.Series)
def unique(self, as_series=False):
+ """unique() is not supported by default because it produces a
+ non-deferred result, a numpy array. You may use the Beam-specific argument
+ ``unique(as_series=True)`` to get the result as a
:class:`DeferredSeries`"""
+
if not as_series:
raise frame_base.WontImplementError(
"unique() is not supported by default because it produces a "
- "non-deferred result, a numpy array. You may call it with "
- "unique(as_series=True) to get the result as a DeferredSeries",
+ "non-deferred result, a numpy array. You may use the Beam-specific "
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -2192,9 +2311,15 @@ def nlargest(self, keep, **kwargs):
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def nsmallest(self, keep, **kwargs):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]