This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d67d73b [SPARK-35505][PYTHON] Remove APIs which have been deprecated in Koalas d67d73b is described below commit d67d73b70860d4e56fdcd6fc61f826245a52d186 Author: Takuya UESHIN <ues...@databricks.com> AuthorDate: Tue May 25 11:16:27 2021 -0700 [SPARK-35505][PYTHON] Remove APIs which have been deprecated in Koalas ### What changes were proposed in this pull request? Removes APIs which have been deprecated in Koalas. ### Why are the changes needed? There are some APIs that have been deprecated in Koalas. We shouldn't have those in pandas APIs on Spark. ### Does this PR introduce _any_ user-facing change? Yes, the APIs deprecated in Koalas will be no longer available. ### How was this patch tested? Modified some tests which use the deprecated APIs, and the other existing tests should pass. Closes #32656 from ueshin/issues/SPARK-35505/remove_deprecated. Authored-by: Takuya UESHIN <ues...@databricks.com> Signed-off-by: Takuya UESHIN <ues...@databricks.com> --- python/pyspark/pandas/base.py | 12 --- python/pyspark/pandas/frame.py | 123 -------------------------- python/pyspark/pandas/indexes/base.py | 21 +---- python/pyspark/pandas/indexes/multi.py | 11 --- python/pyspark/pandas/series.py | 40 --------- python/pyspark/pandas/tests/test_dataframe.py | 79 ++++++++++------- 6 files changed, 46 insertions(+), 240 deletions(-) diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py index cb8a2c9..55a2c48 100644 --- a/python/pyspark/pandas/base.py +++ b/python/pyspark/pandas/base.py @@ -22,7 +22,6 @@ from abc import ABCMeta, abstractmethod from functools import wraps, partial from itertools import chain from typing import Any, Callable, Optional, Tuple, Union, cast, TYPE_CHECKING -import warnings import numpy as np import pandas as pd # noqa: F401 @@ -308,17 +307,6 @@ class IndexOpsMixin(object, metaclass=ABCMeta): pass @property - def spark_column(self) -> Column: - warnings.warn( - "Series.spark_column is deprecated as of Series.spark.column. " - "Please use the API instead.", - FutureWarning, - ) - return self.spark.column - - spark_column.__doc__ = SparkIndexOpsMethods.column.__doc__ - - @property def _dtype_op(self): from pyspark.pandas.data_type_ops.base import DataTypeOps diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index 9d2553d..709696e 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -2298,27 +2298,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})] T = property(transpose) - def apply_batch(self, func, args=(), **kwds) -> "DataFrame": - warnings.warn( - "DataFrame.apply_batch is deprecated as of DataFrame.koalas.apply_batch. " - "Please use the API instead.", - FutureWarning, - ) - return self.koalas.apply_batch(func, args=args, **kwds) - - apply_batch.__doc__ = PandasOnSparkFrameMethods.apply_batch.__doc__ - - # TODO: Remove this API. - def map_in_pandas(self, func) -> "DataFrame": - warnings.warn( - "DataFrame.map_in_pandas is deprecated as of DataFrame.koalas.apply_batch. " - "Please use the API instead.", - FutureWarning, - ) - return self.koalas.apply_batch(func) - - map_in_pandas.__doc__ = PandasOnSparkFrameMethods.apply_batch.__doc__ - def apply(self, func, axis=0, args=(), **kwds) -> Union["Series", "DataFrame", "Index"]: """ Apply a function along an axis of the DataFrame. @@ -2768,16 +2747,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})] lambda psser: psser.koalas.transform_batch(func, *args, **kwargs) ) - def transform_batch(self, func, *args, **kwargs) -> "DataFrame": - warnings.warn( - "DataFrame.transform_batch is deprecated as of DataFrame.koalas.transform_batch. " - "Please use the API instead.", - FutureWarning, - ) - return self.koalas.transform_batch(func, *args, **kwargs) - - transform_batch.__doc__ = PandasOnSparkFrameMethods.transform_batch.__doc__ - def pop(self, item) -> "DataFrame": """ Return item and drop from frame. Raise KeyError if not found. @@ -4573,36 +4542,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})] ) return DataFrame(internal) - def cache(self) -> "CachedDataFrame": - warnings.warn( - "DataFrame.cache is deprecated as of DataFrame.spark.cache. " - "Please use the API instead.", - FutureWarning, - ) - return self.spark.cache() - - cache.__doc__ = SparkFrameMethods.cache.__doc__ - - def persist(self, storage_level=StorageLevel.MEMORY_AND_DISK) -> "CachedDataFrame": - warnings.warn( - "DataFrame.persist is deprecated as of DataFrame.spark.persist. " - "Please use the API instead.", - FutureWarning, - ) - return self.spark.persist(storage_level) - - persist.__doc__ = SparkFrameMethods.persist.__doc__ - - def hint(self, name: str, *parameters) -> "DataFrame": - warnings.warn( - "DataFrame.hint is deprecated as of DataFrame.spark.hint. " - "Please use the API instead.", - FutureWarning, - ) - return self.spark.hint(name, *parameters) - - hint.__doc__ = SparkFrameMethods.hint.__doc__ - def to_table( self, name: str, @@ -4875,17 +4814,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})] """ return self._internal.to_pandas_frame.copy() - # Alias to maintain backward compatibility with Spark - def toPandas(self) -> pd.DataFrame: - warnings.warn( - "DataFrame.toPandas is deprecated as of DataFrame.to_pandas. " - "Please use the API instead.", - FutureWarning, - ) - return self.to_pandas() - - toPandas.__doc__ = to_pandas.__doc__ - def assign(self, **kwargs) -> "DataFrame": """ Assign new columns to a DataFrame. @@ -6345,26 +6273,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})] ), ) - def spark_schema(self, index_col: Optional[Union[str, List[str]]] = None) -> StructType: - warnings.warn( - "DataFrame.spark_schema is deprecated as of DataFrame.spark.schema. " - "Please use the API instead.", - FutureWarning, - ) - return self.spark.schema(index_col) - - spark_schema.__doc__ = SparkFrameMethods.schema.__doc__ - - def print_schema(self, index_col: Optional[Union[str, List[str]]] = None) -> None: - warnings.warn( - "DataFrame.print_schema is deprecated as of DataFrame.spark.print_schema. " - "Please use the API instead.", - FutureWarning, - ) - return self.spark.print_schema(index_col) - - print_schema.__doc__ = SparkFrameMethods.print_schema.__doc__ - def select_dtypes(self, include=None, exclude=None) -> "DataFrame": """ Return a subset of the DataFrame's columns based on the column dtypes. @@ -10926,16 +10834,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})] else: return DataFrame(internal) - def explain(self, extended: Optional[bool] = None, mode: Optional[str] = None) -> None: - warnings.warn( - "DataFrame.explain is deprecated as of DataFrame.spark.explain. " - "Please use the API instead.", - FutureWarning, - ) - return self.spark.explain(extended, mode) - - explain.__doc__ = SparkFrameMethods.explain.__doc__ - def take(self, indices, axis=0, **kwargs) -> "DataFrame": """ Return the elements in the given *positional* indices along an axis. @@ -11929,27 +11827,6 @@ class CachedDataFrame(DataFrame): # create accessor for Spark related methods. spark = CachedAccessor("spark", CachedSparkFrameMethods) - @property - def storage_level(self) -> StorageLevel: - warnings.warn( - "DataFrame.storage_level is deprecated as of DataFrame.spark.storage_level. " - "Please use the API instead.", - FutureWarning, - ) - return self.spark.storage_level - - storage_level.__doc__ = CachedSparkFrameMethods.storage_level.__doc__ - - def unpersist(self) -> None: - warnings.warn( - "DataFrame.unpersist is deprecated as of DataFrame.spark.unpersist. " - "Please use the API instead.", - FutureWarning, - ) - return self.spark.unpersist() - - unpersist.__doc__ = CachedSparkFrameMethods.unpersist.__doc__ - def _test(): import os diff --git a/python/pyspark/pandas/indexes/base.py b/python/pyspark/pandas/indexes/base.py index 67f89c0..f62f30d 100644 --- a/python/pyspark/pandas/indexes/base.py +++ b/python/pyspark/pandas/indexes/base.py @@ -38,7 +38,7 @@ from pandas._libs import lib from pyspark import sql as spark from pyspark.sql import functions as F -from pyspark.sql.types import DataType, FractionalType, IntegralType, TimestampType +from pyspark.sql.types import FractionalType, IntegralType, TimestampType from pyspark import pandas as ps # For running doctests and reference resolution in PyCharm. from pyspark.pandas.config import get_option, option_context @@ -483,15 +483,6 @@ class Index(IndexOpsMixin): """ return self._to_internal_pandas().copy() - def toPandas(self) -> pd.Index: - warnings.warn( - "Index.toPandas is deprecated as of Index.to_pandas. Please use the API instead.", - FutureWarning, - ) - return self.to_pandas() - - toPandas.__doc__ = to_pandas.__doc__ - def to_numpy(self, dtype: Optional[Union[str, Dtype]] = None, copy: bool = False) -> np.ndarray: """ A NumPy ndarray representing the values in this Index or MultiIndex. @@ -583,16 +574,6 @@ class Index(IndexOpsMixin): return None @property - def spark_type(self) -> DataType: - """ Returns the data type as defined by Spark, as a Spark DataType object.""" - warnings.warn( - "Index.spark_type is deprecated as of Index.spark.data_type. " - "Please use the API instead.", - FutureWarning, - ) - return self.spark.data_type - - @property def has_duplicates(self) -> bool: """ If index has duplicates, return True, otherwise False. diff --git a/python/pyspark/pandas/indexes/multi.py b/python/pyspark/pandas/indexes/multi.py index 9c4e95c..ee40d06 100644 --- a/python/pyspark/pandas/indexes/multi.py +++ b/python/pyspark/pandas/indexes/multi.py @@ -18,7 +18,6 @@ from distutils.version import LooseVersion from functools import partial from typing import Any, Callable, Iterator, List, Optional, Tuple, Union, cast, no_type_check -import warnings import pandas as pd from pandas.api.types import is_list_like @@ -681,16 +680,6 @@ class MultiIndex(Index): # series-like operations. In that case, it creates new Index object instead of MultiIndex. return super().to_pandas() - def toPandas(self) -> pd.MultiIndex: - warnings.warn( - "MultiIndex.toPandas is deprecated as of MultiIndex.to_pandas. " - "Please use the API instead.", - FutureWarning, - ) - return self.to_pandas() - - toPandas.__doc__ = to_pandas.__doc__ - def nunique(self, dropna: bool = True, approx: bool = False, rsd: float = 0.05) -> int: raise NotImplementedError("nunique is not defined for MultiIndex") diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index d4c72a4..e20ddbc 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -22,7 +22,6 @@ import datetime import re import inspect import sys -import warnings from collections.abc import Mapping from functools import partial, wraps, reduce from typing import Any, Generic, Iterable, List, Optional, Tuple, TypeVar, Union, cast @@ -467,17 +466,6 @@ class Series(Frame, IndexOpsMixin, Generic[T]): """ return [self.index] - @property - def spark_type(self): - warnings.warn( - "Series.spark_type is deprecated as of Series.spark.data_type. " - "Please use the API instead.", - FutureWarning, - ) - return self.spark.data_type - - spark_type.__doc__ = SparkSeriesMethods.data_type.__doc__ - # Arithmetic Operators def add(self, other) -> "Series": return self + other @@ -1017,14 +1005,6 @@ class Series(Frame, IndexOpsMixin, Generic[T]): else: return self.apply(arg) - def alias(self, name) -> "Series": - """An alias for :meth:`Series.rename`.""" - warnings.warn( - "Series.alias is deprecated as of Series.rename. Please use the API instead.", - FutureWarning, - ) - return self.rename(name) - @property def shape(self): """Return a tuple of the shape of the underlying data.""" @@ -1518,16 +1498,6 @@ class Series(Frame, IndexOpsMixin, Generic[T]): """ return self._to_internal_pandas().copy() - # Alias to maintain backward compatibility with Spark - def toPandas(self) -> pd.Series: - warnings.warn( - "Series.toPandas is deprecated as of Series.to_pandas. Please use the API instead.", - FutureWarning, - ) - return self.to_pandas() - - toPandas.__doc__ = to_pandas.__doc__ - def to_list(self) -> List: """ Return a list of the values. @@ -3300,16 +3270,6 @@ class Series(Frame, IndexOpsMixin, Generic[T]): else: return self.apply(func, args=args, **kwargs) - def transform_batch(self, func, *args, **kwargs) -> "ps.Series": - warnings.warn( - "Series.transform_batch is deprecated as of Series.koalas.transform_batch. " - "Please use the API instead.", - FutureWarning, - ) - return self.koalas.transform_batch(func, *args, **kwargs) - - transform_batch.__doc__ = PandasOnSparkSeriesMethods.transform_batch.__doc__ - def round(self, decimals=0) -> "Series": """ Round each value in a Series to the given number of decimals. diff --git a/python/pyspark/pandas/tests/test_dataframe.py b/python/pyspark/pandas/tests/test_dataframe.py index 590900f..5e6b6b9 100644 --- a/python/pyspark/pandas/tests/test_dataframe.py +++ b/python/pyspark/pandas/tests/test_dataframe.py @@ -28,6 +28,7 @@ from pandas.tseries.offsets import DateOffset from pyspark import StorageLevel from pyspark.ml.linalg import SparseVector from pyspark.sql import functions as F +from pyspark.sql.types import StructType from pyspark import pandas as ps from pyspark.pandas.config import option_context @@ -1849,7 +1850,6 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): def test_to_pandas(self): pdf, psdf = self.df_pair - self.assert_eq(psdf.toPandas(), pdf) self.assert_eq(psdf.to_pandas(), pdf) def test_isin(self): @@ -4324,8 +4324,6 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): ) psdf = ps.DataFrame(pdf) - # One to test alias. - self.assert_eq(psdf.apply_batch(lambda pdf: pdf + 1).sort_index(), (pdf + 1).sort_index()) self.assert_eq( psdf.koalas.apply_batch(lambda pdf, a: pdf + a, args=(1,)).sort_index(), (pdf + 1).sort_index(), @@ -4377,10 +4375,6 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): ) psdf = ps.DataFrame(pdf) - # One to test alias. - self.assert_eq( - psdf.transform_batch(lambda pdf: pdf + 1).sort_index(), (pdf + 1).sort_index() - ) self.assert_eq( psdf.koalas.transform_batch(lambda pdf: pdf.c + 1).sort_index(), (pdf.c + 1).sort_index(), @@ -4445,14 +4439,6 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): ) psdf = ps.range(10) - # One to test alias. - psdf["d"] = psdf.id.transform_batch(lambda ser: ser + 1) - self.assert_eq( - psdf, - pd.DataFrame({"id": list(range(10)), "d": list(range(1, 11))}, columns=["id", "d"]), - ) - - psdf = ps.range(10) def plus_one(pdf) -> ps.Series[np.int64]: return pdf.id + 1 @@ -4796,10 +4782,10 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): ) psdf = ps.from_pandas(pdf) - with psdf.cache() as cached_df: + with psdf.spark.cache() as cached_df: self.assert_eq(isinstance(cached_df, CachedDataFrame), True) self.assert_eq( - repr(cached_df.storage_level), repr(StorageLevel(True, True, False, True)) + repr(cached_df.spark.storage_level), repr(StorageLevel(True, True, False, True)) ) def test_persist(self): @@ -4815,11 +4801,11 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): ] for storage_level in storage_levels: - with psdf.persist(storage_level) as cached_df: + with psdf.spark.persist(storage_level) as cached_df: self.assert_eq(isinstance(cached_df, CachedDataFrame), True) - self.assert_eq(repr(cached_df.storage_level), repr(storage_level)) + self.assert_eq(repr(cached_df.spark.storage_level), repr(storage_level)) - self.assertRaises(TypeError, lambda: psdf.persist("DISK_ONLY")) + self.assertRaises(TypeError, lambda: psdf.spark.persist("DISK_ONLY")) def test_squeeze(self): axises = [None, 0, 1, "rows", "index", "columns"] @@ -5075,8 +5061,31 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): }, columns=["a", "b", "c", "d", "e", "f"], ) - self.assertEqual(psdf.spark_schema(), psdf.spark.schema()) - self.assertEqual(psdf.spark_schema("index"), psdf.spark.schema("index")) + + actual = psdf.spark.schema() + expected = ( + StructType() + .add("a", "string", False) + .add("b", "long", False) + .add("c", "byte", False) + .add("d", "double", False) + .add("e", "boolean", False) + .add("f", "timestamp", False) + ) + self.assertEqual(actual, expected) + + actual = psdf.spark.schema("index") + expected = ( + StructType() + .add("index", "long", False) + .add("a", "string", False) + .add("b", "long", False) + .add("c", "byte", False) + .add("d", "double", False) + .add("e", "boolean", False) + .add("f", "timestamp", False) + ) + self.assertEqual(actual, expected) def test_print_schema(self): psdf = ps.DataFrame( @@ -5088,15 +5097,22 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): try: out = StringIO() sys.stdout = out - psdf.print_schema() + psdf.spark.print_schema() actual = out.getvalue().strip() + self.assertTrue("a: string" in actual, actual) + self.assertTrue("b: long" in actual, actual) + self.assertTrue("c: byte" in actual, actual) + out = StringIO() sys.stdout = out - psdf.spark.print_schema() - expected = out.getvalue().strip() + psdf.spark.print_schema(index_col="index") + actual = out.getvalue().strip() - self.assertEqual(actual, expected) + self.assertTrue("index: long" in actual, actual) + self.assertTrue("a: string" in actual, actual) + self.assertTrue("b: long" in actual, actual) + self.assertTrue("c: byte" in actual, actual) finally: sys.stdout = prev @@ -5107,20 +5123,15 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): psdf2 = ps.DataFrame( {"rkey": ["foo", "bar", "baz", "foo"], "value": [5, 6, 7, 8]}, columns=["rkey", "value"] ) - merged = psdf1.merge(psdf2.hint("broadcast"), left_on="lkey", right_on="rkey") + merged = psdf1.merge(psdf2.spark.hint("broadcast"), left_on="lkey", right_on="rkey") prev = sys.stdout try: out = StringIO() sys.stdout = out - merged.explain() - actual = out.getvalue().strip() - - out = StringIO() - sys.stdout = out merged.spark.explain() - expected = out.getvalue().strip() + actual = out.getvalue().strip() - self.assertEqual(actual, expected) + self.assertTrue("Broadcast" in actual, actual) finally: sys.stdout = prev --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org