This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f83e5ec202b [SPARK-45065][PYTHON][PS] Support Pandas 2.1.0 f83e5ec202b is described below commit f83e5ec202be68d6640f9f9403d96a39ef993f82 Author: Haejoon Lee <haejoon....@databricks.com> AuthorDate: Mon Sep 18 16:27:09 2023 -0700 [SPARK-45065][PYTHON][PS] Support Pandas 2.1.0 ### What changes were proposed in this pull request? This PR proposes to support pandas 2.1.0 for PySpark. See [What's new in 2.1.0](https://pandas.pydata.org/docs/dev/whatsnew/v2.1.0.html) for more detail. ### Why are the changes needed? We should follow the latest version of pandas. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The existing CI should passed with Pandas 2.1.0 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42793 from itholic/pandas_2.1.0. Lead-authored-by: Haejoon Lee <haejoon....@databricks.com> Co-authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- dev/infra/Dockerfile | 4 +- .../source/migration_guide/pyspark_upgrade.rst | 3 + .../docs/source/reference/pyspark.pandas/frame.rst | 1 + python/pyspark/pandas/base.py | 2 - python/pyspark/pandas/frame.py | 143 +++++++++++++++++++-- python/pyspark/pandas/generic.py | 42 ++++++ python/pyspark/pandas/groupby.py | 50 ++++++- python/pyspark/pandas/indexes/base.py | 6 +- python/pyspark/pandas/indexes/datetimes.py | 18 +++ python/pyspark/pandas/indexes/timedelta.py | 7 + python/pyspark/pandas/namespace.py | 21 ++- python/pyspark/pandas/plot/matplotlib.py | 2 +- python/pyspark/pandas/series.py | 83 +++++++++++- python/pyspark/pandas/supported_api_gen.py | 2 +- .../pandas/tests/computation/test_corrwith.py | 5 +- python/pyspark/pandas/tests/test_stats.py | 31 ++++- python/pyspark/pandas/typedef/typehints.py | 15 ++- python/pyspark/pandas/window.py | 4 + python/pyspark/sql/connect/session.py | 3 +- python/pyspark/sql/pandas/conversion.py | 16 ++- python/pyspark/sql/pandas/serializers.py | 8 +- python/pyspark/sql/pandas/types.py | 14 +- .../sql/tests/pandas/test_pandas_cogrouped_map.py | 2 +- 23 files changed, 415 insertions(+), 67 deletions(-) diff --git a/dev/infra/Dockerfile b/dev/infra/Dockerfile index 8e5a3cb7c05..d196d0e97c5 100644 --- a/dev/infra/Dockerfile +++ b/dev/infra/Dockerfile @@ -84,8 +84,8 @@ RUN Rscript -e "devtools::install_version('roxygen2', version='7.2.0', repos='ht # See more in SPARK-39735 ENV R_LIBS_SITE "/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library" -RUN pypy3 -m pip install numpy 'pandas<=2.0.3' scipy coverage matplotlib -RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.0.3' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*' +RUN pypy3 -m pip install numpy 'pandas<=2.1.0' scipy coverage matplotlib +RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.1.0' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*' # Add Python deps for Spark Connect. RUN python3.9 -m pip install 'grpcio>=1.48,<1.57' 'grpcio-status>=1.48,<1.57' 'protobuf==3.20.3' 'googleapis-common-protos==1.56.4' diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst b/python/docs/source/migration_guide/pyspark_upgrade.rst index 3513f0a878e..d081275dc83 100644 --- a/python/docs/source/migration_guide/pyspark_upgrade.rst +++ b/python/docs/source/migration_guide/pyspark_upgrade.rst @@ -22,6 +22,7 @@ Upgrading PySpark Upgrading from PySpark 3.5 to 4.0 --------------------------------- +* In Spark 4.0, it is recommended to use Pandas version 2.0.0 or above with PySpark for optimal compatibility. * In Spark 4.0, the minimum supported version for Pandas has been raised from 1.0.5 to 1.4.4 in PySpark. * In Spark 4.0, the minimum supported version for Numpy has been raised from 1.15 to 1.21 in PySpark. * In Spark 4.0, ``Int64Index`` and ``Float64Index`` have been removed from pandas API on Spark, ``Index`` should be used directly. @@ -44,6 +45,8 @@ Upgrading from PySpark 3.5 to 4.0 * In Spark 4.0, ``squeeze`` parameter from ``ps.read_csv`` and ``ps.read_excel`` has been removed from pandas API on Spark. * In Spark 4.0, ``null_counts`` parameter from ``DataFrame.info`` has been removed from pandas API on Spark, use ``show_counts`` instead. * In Spark 4.0, the result of ``MultiIndex.append`` does not keep the index names from pandas API on Spark. +* In Spark 4.0, ``DataFrameGroupBy.agg`` with lists respecting ``as_index=False`` from pandas API on Spark. +* In Spark 4.0, ``DataFrame.stack`` guarantees the order of existing columns instead of sorting them lexicographically from pandas API on Spark. * In Spark 4.0, ``True`` or ``False`` to ``inclusive`` parameter from ``Series.between`` has been removed from pandas API on Spark, use ``both`` or ``neither`` instead respectively. * In Spark 4.0, ``Index.asi8`` has been removed from pandas API on Spark, use ``Index.astype`` instead. * In Spark 4.0, ``Index.is_type_compatible`` has been removed from pandas API on Spark, use ``Index.isin`` instead. diff --git a/python/docs/source/reference/pyspark.pandas/frame.rst b/python/docs/source/reference/pyspark.pandas/frame.rst index 5f839a803d7..a22078f86e2 100644 --- a/python/docs/source/reference/pyspark.pandas/frame.rst +++ b/python/docs/source/reference/pyspark.pandas/frame.rst @@ -127,6 +127,7 @@ Function application, GroupBy & Window DataFrame.apply DataFrame.applymap + DataFrame.map DataFrame.pipe DataFrame.agg DataFrame.aggregate diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py index ef0b51f757d..ed6e983fdc8 100644 --- a/python/pyspark/pandas/base.py +++ b/python/pyspark/pandas/base.py @@ -978,7 +978,6 @@ class IndexOpsMixin(object, metaclass=ABCMeta): NA values, such as None or numpy.NaN, get mapped to True values. Everything else gets mapped to False values. Characters such as empty strings '' or numpy.inf are not considered NA values - (unless you set pandas.options.mode.use_inf_as_na = True). Returns ------- @@ -1012,7 +1011,6 @@ class IndexOpsMixin(object, metaclass=ABCMeta): Return a boolean same-sized object indicating if the values are not NA. Non-missing values get mapped to True. Characters such as empty strings '' or numpy.inf are not considered NA values - (unless you set pandas.options.mode.use_inf_as_na = True). NA values, such as None or numpy.NaN, get mapped to False values. Returns diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index b19f55e7eba..26cb15417f5 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -1271,6 +1271,8 @@ class DataFrame(Frame, Generic[T]): This method applies a function that accepts and returns a scalar to every element of a DataFrame. + .. deprecated:: 4.0.0 + .. note:: this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting. @@ -1321,7 +1323,74 @@ class DataFrame(Frame, Generic[T]): 0 1.000000 4.494400 1 11.262736 20.857489 """ + warnings.warn( + "DataFrame.applymap has been deprecated. Use DataFrame.map instead", FutureWarning + ) + + # TODO: We can implement shortcut theoretically since it creates new DataFrame + # anyway and we don't have to worry about operations on different DataFrames. + return self.map(func=func) + + def map(self, func: Callable[[Any], Any]) -> "DataFrame": + """ + Apply a function to a Dataframe elementwise. + + This method applies a function that accepts and returns a scalar + to every element of a DataFrame. + + .. versionadded:: 4.0.0 + DataFrame.applymap was deprecated and renamed to DataFrame.map. + + .. note:: this API executes the function once to infer the type which is + potentially expensive, for instance, when the dataset is created after + aggregations or sorting. + + To avoid this, specify return type in ``func``, for instance, as below: + + >>> def square(x) -> np.int32: + ... return x ** 2 + + pandas-on-Spark uses return type hints and does not try to infer the type. + + Parameters + ---------- + func : callable + Python function returns a single value from a single value. + + Returns + ------- + DataFrame + Transformed DataFrame. + + Examples + -------- + >>> df = ps.DataFrame([[1, 2.12], [3.356, 4.567]]) + >>> df + 0 1 + 0 1.000 2.120 + 1 3.356 4.567 + >>> def str_len(x) -> int: + ... return len(str(x)) + >>> df.map(str_len) + 0 1 + 0 3 4 + 1 5 5 + + >>> def power(x) -> float: + ... return x ** 2 + >>> df.map(power) + 0 1 + 0 1.000000 4.494400 + 1 11.262736 20.857489 + + You can omit type hints and let pandas-on-Spark infer its type. + + >>> df.map(lambda x: x ** 2) + 0 1 + 0 1.000000 4.494400 + 1 11.262736 20.857489 + """ # TODO: We can implement shortcut theoretically since it creates new DataFrame # anyway and we don't have to worry about operations on different DataFrames. return self._apply_series_op(lambda psser: psser.apply(func)) @@ -5556,6 +5625,10 @@ defaultdict(<class 'list'>, {'col..., 'col...})] Parameters ---------- data : ndarray (structured dtype), list of tuples, dict, or DataFrame + + .. deprecated:: 4.0.0 + Passing a DataFrame is deprecated. + index : string, list of fields, array-like Field of array to use as the index, alternately a specific set of input labels to use exclude : sequence, default None @@ -5952,6 +6025,9 @@ defaultdict(<class 'list'>, {'col..., 'col...})] Method to use for filling holes in reindexed Series pad / ffill: propagate last valid observation forward to next valid backfill / bfill: use NEXT valid observation to fill gap + + .. deprecated:: 4.0.0 + axis : {0 or `index`} 1 and `columns` are not supported. inplace : boolean, default False @@ -5963,6 +6039,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})] this is the maximum number of entries along the entire axis where NaNs will be filled. Must be greater than 0 if not None + .. deprecated:: 4.0.0 + Returns ------- DataFrame @@ -6046,6 +6124,11 @@ defaultdict(<class 'list'>, {'col..., 'col...})] return psser._fillna(value=value, method=method, axis=axis, limit=limit) elif method is not None: + warnings.warn( + "DataFrame.fillna with 'method' is deprecated and will raise in a future version. " + "Use DataFrame.ffill() or DataFrame.bfill() instead.", + FutureWarning, + ) def op(psser: ps.Series) -> ps.Series: return psser._fillna(value=value, method=method, axis=axis, limit=limit) @@ -6121,6 +6204,21 @@ defaultdict(<class 'list'>, {'col..., 'col...})] If value is a list or tuple, value should be of the same length with to_replace. inplace : boolean, default False Fill in place (do not create a new object) + limit : int, default None + Maximum size gap to forward or backward fill. + + .. deprecated:: 4.0.0 + + regex : bool or str, default False + Whether to interpret to_replace and/or value as regular expressions. + If this is True then to_replace must be a string. + Alternatively, this could be a regular expression in which case to_replace must be None. + method : 'pad', default None + The method to use when for replacement, when to_replace is a scalar, + list or tuple and value is None. + + .. deprecated:: 4.0.0 + Returns ------- @@ -6189,8 +6287,18 @@ defaultdict(<class 'list'>, {'col..., 'col...})] 3 Hulk Smash """ if method != "pad": + warnings.warn( + "The 'method' keyword in DataFrame.replace is deprecated " + "and will be removed in a future version.", + FutureWarning, + ) raise NotImplementedError("replace currently works only for method='pad") if limit is not None: + warnings.warn( + "The 'limit' keyword in DataFrame.replace is deprecated " + "and will be removed in a future version.", + FutureWarning, + ) raise NotImplementedError("replace currently works only when limit=None") if regex is not False: raise NotImplementedError("replace currently doesn't supports regex") @@ -6221,6 +6329,13 @@ defaultdict(<class 'list'>, {'col..., 'col...})] return psser else: + if value is None: + warnings.warn( + "DataFrame.replace without 'value' and with non-dict-like 'to_replace' " + "is deprecated and will raise in a future version. " + "Explicitly specify the new values instead.", + FutureWarning, + ) def op(psser: ps.Series) -> ps.Series: return psser.replace(to_replace=to_replace, value=value, regex=regex) @@ -6344,6 +6459,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})] When having a DataFrame with dates as index, this function can select the last few rows based on a date offset. + .. deprecated:: 4.0.0 + Parameters ---------- offset : str or DateOffset @@ -6383,6 +6500,11 @@ defaultdict(<class 'list'>, {'col..., 'col...})] 3 observed days in the dataset, and therefore data for 2018-04-11 was not returned. """ + warnings.warn( + "last is deprecated and will be removed in a future version. " + "Please create a mask and filter using `.loc` instead", + FutureWarning, + ) # Check index type should be format DateTime if not isinstance(self.index, ps.DatetimeIndex): raise TypeError("'last' only supports a DatetimeIndex") @@ -6401,6 +6523,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})] When having a DataFrame with dates as index, this function can select the first few rows based on a date offset. + .. deprecated:: 4.0.0 + Parameters ---------- offset : str or DateOffset @@ -6440,6 +6564,11 @@ defaultdict(<class 'list'>, {'col..., 'col...})] 3 observed days in the dataset, and therefore data for 2018-04-13 was not returned. """ + warnings.warn( + "first is deprecated and will be removed in a future version. " + "Please create a mask and filter using `.loc` instead", + FutureWarning, + ) # Check index type should be format DatetimeIndex if not isinstance(self.index, ps.DatetimeIndex): raise TypeError("'first' only supports a DatetimeIndex") @@ -10527,12 +10656,12 @@ defaultdict(<class 'list'>, {'col..., 'col...})] kg m cat 1.0 2.0 dog 3.0 4.0 - >>> df_multi_level_cols2.stack().sort_index() # doctest: +SKIP - height weight - cat kg NaN 1.0 - m 2.0 NaN - dog kg NaN 3.0 - m 4.0 NaN + >>> df_multi_level_cols2.stack().sort_index() + weight height + cat kg 1.0 NaN + m NaN 2.0 + dog kg 3.0 NaN + m NaN 4.0 """ from pyspark.pandas.series import first_series @@ -10558,8 +10687,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})] index_values.add(value) - column_labels = dict(sorted(column_labels.items(), key=lambda x: x[0])) - index_name = self._internal.column_label_names[-1] column_label_names = self._internal.column_label_names[:-1] if len(column_label_names) == 0: diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py index b540045f88f..81f4f6db7ed 100644 --- a/python/pyspark/pandas/generic.py +++ b/python/pyspark/pandas/generic.py @@ -1317,6 +1317,13 @@ class Frame(object, metaclass=ABCMeta): >>> df['b'].sum(min_count=3) nan """ + if axis is None and isinstance(self, ps.DataFrame): + warnings.warn( + "The behavior of DataFrame.sum with axis=None is deprecated, " + "in a future version this will reduce over both axes and return a scalar. " + "To retain the old behavior, pass axis=0 (or do not pass axis)", + FutureWarning, + ) axis = validate_axis(axis) if numeric_only is None and axis == 0: @@ -1418,6 +1425,13 @@ class Frame(object, metaclass=ABCMeta): >>> ps.Series([]).prod(min_count=1) # doctest: +SKIP nan """ + if axis is None and isinstance(self, ps.DataFrame): + warnings.warn( + "The behavior of DataFrame.product with axis=None is deprecated, " + "in a future version this will reduce over both axes and return a scalar. " + "To retain the old behavior, pass axis=0 (or do not pass axis)", + FutureWarning, + ) axis = validate_axis(axis) if numeric_only is None and axis == 0: @@ -1870,6 +1884,13 @@ class Frame(object, metaclass=ABCMeta): if not isinstance(ddof, int): raise TypeError("ddof must be integer") + if axis is None and isinstance(self, ps.DataFrame): + warnings.warn( + "The behavior of DataFrame.std with axis=None is deprecated, " + "in a future version this will reduce over both axes and return a scalar. " + "To retain the old behavior, pass axis=0 (or do not pass axis)", + FutureWarning, + ) axis = validate_axis(axis) if numeric_only is None and axis == 0: @@ -1962,6 +1983,13 @@ class Frame(object, metaclass=ABCMeta): if not isinstance(ddof, int): raise TypeError("ddof must be integer") + if axis is None and isinstance(self, ps.DataFrame): + warnings.warn( + "The behavior of DataFrame.var with axis=None is deprecated, " + "in a future version this will reduce over both axes and return a scalar. " + "To retain the old behavior, pass axis=0 (or do not pass axis)", + FutureWarning, + ) axis = validate_axis(axis) if numeric_only is None and axis == 0: @@ -2191,6 +2219,13 @@ class Frame(object, metaclass=ABCMeta): if not isinstance(ddof, int): raise TypeError("ddof must be integer") + if axis is None and isinstance(self, ps.DataFrame): + warnings.warn( + "The behavior of DataFrame.sem with axis=None is deprecated, " + "in a future version this will reduce over both axes and return a scalar. " + "To retain the old behavior, pass axis=0 (or do not pass axis)", + FutureWarning, + ) axis = validate_axis(axis) if numeric_only is None and axis == 0: @@ -2448,6 +2483,8 @@ class Frame(object, metaclass=ABCMeta): This must be a boolean scalar value, either True or False. Raise a ValueError if the object does not have exactly 1 element, or that element is not boolean + .. deprecated:: 4.0.0 + Returns ------- bool @@ -2479,6 +2516,11 @@ class Frame(object, metaclass=ABCMeta): ... ValueError: bool cannot act on a non-boolean single element DataFrame """ + warnings.warn( + f"{self.__class__.__name__}.bool is now deprecated " + "and will be removed in future version.", + FutureWarning, + ) if isinstance(self, ps.DataFrame): df = self elif isinstance(self, ps.Series): diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py index bbc292cf744..c7924fa3345 100644 --- a/python/pyspark/pandas/groupby.py +++ b/python/pyspark/pandas/groupby.py @@ -305,7 +305,14 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): i for i, gkey in enumerate(self._groupkeys) if gkey._psdf is not self._psdf ) if len(should_drop_index) > 0: - psdf = psdf.reset_index(level=should_drop_index, drop=True) + drop = not any( + [ + isinstance(func_or_funcs[gkey.name], list) + for gkey in self._groupkeys + if gkey.name in func_or_funcs + ] + ) + psdf = psdf.reset_index(level=should_drop_index, drop=drop) if len(should_drop_index) < len(self._groupkeys): psdf = psdf.reset_index() @@ -701,6 +708,14 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): raise TypeError("must be real number, not %s" % type(q).__name__) if not 0 <= q <= 1: raise ValueError("'q' must be between 0 and 1. Got '%s' instead" % q) + if any(isinstance(_agg_col.spark.data_type, BooleanType) for _agg_col in self._agg_columns): + warnings.warn( + f"Allowing bool dtype in {self.__class__.__name__}.quantile is deprecated " + "and will raise in a future version, matching the Series/DataFrame behavior. " + "Cast to uint8 dtype before calling quantile instead.", + FutureWarning, + ) + return self._reduce_for_stat_function( lambda col: F.percentile_approx(col.cast(DoubleType()), q, accuracy), accepted_spark_types=(NumericType, BooleanType), @@ -2293,7 +2308,6 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): should_resolve=True, ) - # TODO: add axis parameter def idxmax(self, skipna: bool = True) -> FrameLike: """ Return index of first occurrence of maximum over requested axis in group. @@ -2376,7 +2390,6 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): ) return self._handle_output(DataFrame(internal)) - # TODO: add axis parameter def idxmin(self, skipna: bool = True) -> FrameLike: """ Return index of first occurrence of minimum over requested axis in group. @@ -2479,8 +2492,16 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): Method to use for filling holes in reindexed Series pad / ffill: propagate last valid observation forward to next valid backfill / bfill: use NEXT valid observation to fill gap + + .. deprecated:: 4.0.0 + axis : {0 or `index`} 1 and `columns` are not supported. + + .. deprecated:: 4.0.0 + For axis=1, operate on the underlying object instead. + Otherwise the axis keyword is not necessary. + inplace : boolean, default False Fill in place (do not create a new object) limit : int, default None @@ -2490,6 +2511,9 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): this is the maximum number of entries along the entire axis where NaNs will be filled. Must be greater than 0 if not None + .. deprecated:: 4.0.0 + + Returns ------- DataFrame @@ -2527,11 +2551,19 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): 2 3.0 1.0 5 3 3.0 1.0 4 """ + should_resolve = method is not None + if should_resolve: + warnings.warn( + "DataFrameGroupBy.fillna with 'method' is deprecated " + "and will raise in a future version. " + "Use DataFrameGroupBy.ffill() or DataFrameGroupBy.bfill() instead.", + FutureWarning, + ) return self._apply_series_op( lambda sg: sg._psser._fillna( value=value, method=method, axis=axis, limit=limit, part_cols=sg._groupkeys_scols ), - should_resolve=(method is not None), + should_resolve=should_resolve, ) def bfill(self, limit: Optional[int] = None) -> FrameLike: @@ -3573,6 +3605,16 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): ) ) if not self._as_index: + column_names = [column.name for column in self._agg_columns] + for groupkey in self._groupkeys: + if groupkey.name not in column_names: + warnings.warn( + "A grouping was used that is not in the columns of the DataFrame and so " + "was excluded from the result. " + "This grouping will be included in a future version. " + "Add the grouping as a column of the DataFrame to silence this warning.", + FutureWarning, + ) should_drop_index = set( i for i, gkey in enumerate(self._groupkeys) if gkey._psdf is not self._psdf ) diff --git a/python/pyspark/pandas/indexes/base.py b/python/pyspark/pandas/indexes/base.py index 28e98bc4cf2..48ce22b6e51 100644 --- a/python/pyspark/pandas/indexes/base.py +++ b/python/pyspark/pandas/indexes/base.py @@ -34,9 +34,7 @@ import pandas as pd import numpy as np from pandas.api.types import ( # type: ignore[attr-defined] is_list_like, - is_interval_dtype, is_bool_dtype, - is_categorical_dtype, is_integer_dtype, is_float_dtype, is_numeric_dtype, @@ -1023,7 +1021,7 @@ class Index(IndexOpsMixin): >>> ps.DataFrame({'a': [1]}, index=[1]).index.is_categorical() False """ - return is_categorical_dtype(self.dtype) + return isinstance(self.dtype, pd.CategoricalDtype) def is_floating(self) -> bool: """ @@ -1056,7 +1054,7 @@ class Index(IndexOpsMixin): >>> ps.DataFrame({'a': [1]}, index=[1]).index.is_interval() False """ - return is_interval_dtype(self.dtype) + return isinstance(self.dtype, pd.IntervalDtype) def is_numeric(self) -> bool: """ diff --git a/python/pyspark/pandas/indexes/datetimes.py b/python/pyspark/pandas/indexes/datetimes.py index f37661cb6a0..2c208974167 100644 --- a/python/pyspark/pandas/indexes/datetimes.py +++ b/python/pyspark/pandas/indexes/datetimes.py @@ -45,9 +45,15 @@ class DatetimeIndex(Index): inferred frequency upon creation. normalize : bool, default False Normalize start/end dates to midnight before generating date range. + + .. deprecated:: 4.0.0 + closed : {'left', 'right'}, optional Set whether to include `start` and `end` that are on the boundary. The default includes boundary points on either end. + + .. deprecated:: 4.0.0 + ambiguous : 'infer', bool-ndarray, 'NaT', default 'raise' When clocks moved backward due to DST, ambiguous times may arise. For example in Central European Time (UTC+01), when going from 03:00 @@ -111,6 +117,18 @@ class DatetimeIndex(Index): copy=False, name=None, ) -> "DatetimeIndex": + if closed is not None: + warnings.warn( + "The 'closed' keyword in DatetimeIndex construction is deprecated " + "and will be removed in a future version.", + FutureWarning, + ) + if normalize is not None: + warnings.warn( + "The 'normalize' keyword in DatetimeIndex construction is deprecated " + "and will be removed in a future version.", + FutureWarning, + ) if not is_hashable(name): raise TypeError("Index.name must be a hashable type") diff --git a/python/pyspark/pandas/indexes/timedelta.py b/python/pyspark/pandas/indexes/timedelta.py index b99c78542e7..3457ebb5bc5 100644 --- a/python/pyspark/pandas/indexes/timedelta.py +++ b/python/pyspark/pandas/indexes/timedelta.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import warnings from typing import cast, no_type_check, Any from functools import partial @@ -100,6 +101,12 @@ class TimedeltaIndex(Index): copy=False, name=None, ) -> "TimedeltaIndex": + if closed is not None: + warnings.warn( + "The 'closed' keyword in TimedeltaIndex construction is deprecated " + "and will be removed in a future version.", + FutureWarning, + ) if not is_hashable(name): raise TypeError("Index.name must be a hashable type") diff --git a/python/pyspark/pandas/namespace.py b/python/pyspark/pandas/namespace.py index 27ff8b1d632..2f951608b72 100644 --- a/python/pyspark/pandas/namespace.py +++ b/python/pyspark/pandas/namespace.py @@ -43,7 +43,6 @@ import numpy as np import pandas as pd from pandas.api.types import ( # type: ignore[attr-defined] is_datetime64_dtype, - is_datetime64tz_dtype, is_list_like, ) from pandas.tseries.offsets import DateOffset @@ -1197,7 +1196,7 @@ def read_excel( reset_index = pdf.reset_index() for name, col in reset_index.items(): dt = col.dtype - if is_datetime64_dtype(dt) or is_datetime64tz_dtype(dt): + if is_datetime64_dtype(dt) or isinstance(dt, pd.DatetimeTZDtype): continue reset_index[name] = col.replace({np.nan: None}) pdf = reset_index @@ -1249,6 +1248,10 @@ def read_html( lxml only accepts the http, FTP and file URL protocols. If you have a URL that starts with ``'https'`` you might try removing the ``'s'``. + .. deprecated:: 4.0.0 + Passing html literal strings is deprecated. + Wrap literal string/bytes input in io.StringIO/io.BytesIO instead. + match : str or compiled regular expression, optional The set of tables containing text matching this regex or string will be returned. Unless the HTML is extremely simple you will probably need to @@ -1921,6 +1924,10 @@ def to_timedelta( * 'ns' / 'nanoseconds' / 'nano' / 'nanos' / 'nanosecond' / 'N' Must not be specified when `arg` context strings and ``errors="raise"``. + + .. deprecated:: 4.0.0 + Units 'T' and 'L' are deprecated and will be removed in a future version. + errors : {'ignore', 'raise', 'coerce'}, default 'raise' - If 'raise', then invalid parsing will raise an exception. - If 'coerce', then invalid parsing will be set as NaT. @@ -2472,6 +2479,16 @@ def concat( if join not in ["inner", "outer"]: raise ValueError("Only can inner (intersect) or outer (union) join the other axis.") + if all([obj.empty for obj in objs]): + warnings.warn( + "The behavior of array concatenation with empty entries is " + "deprecated. In a future version, this will no longer exclude " + "empty items when determining the result dtype. " + "To retain the old behavior, exclude the empty entries before " + "the concat operation.", + FutureWarning, + ) + axis = validate_axis(axis) psdf: DataFrame if axis == 1: diff --git a/python/pyspark/pandas/plot/matplotlib.py b/python/pyspark/pandas/plot/matplotlib.py index 609ad271503..0164ec9f980 100644 --- a/python/pyspark/pandas/plot/matplotlib.py +++ b/python/pyspark/pandas/plot/matplotlib.py @@ -19,7 +19,7 @@ from distutils.version import LooseVersion import matplotlib as mat import numpy as np -from matplotlib.axes._base import _process_plot_format +from matplotlib.axes._base import _process_plot_format # type: ignore[attr-defined] from pandas.core.dtypes.inference import is_list_like from pandas.io.formats.printing import pprint_thing diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index 70592155012..f1b785e1b41 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -21,6 +21,7 @@ A wrapper class for Spark Column to behave like pandas Series. import datetime import re import inspect +import warnings from collections.abc import Mapping from functools import partial, reduce from typing import ( @@ -2074,6 +2075,9 @@ class Series(Frame, IndexOpsMixin, Generic[T]): Method to use for filling holes in reindexed Series pad / ffill: propagate last valid observation forward to next valid backfill / bfill: use NEXT valid observation to fill gap + + .. deprecated:: 4.0.0 + axis : {0 or `index`} 1 and `columns` are not supported. inplace : boolean, default False @@ -2085,6 +2089,9 @@ class Series(Frame, IndexOpsMixin, Generic[T]): this is the maximum number of entries along the entire axis where NaNs will be filled. Must be greater than 0 if not None + .. deprecated:: 4.0.0 + + Returns ------- Series @@ -2136,6 +2143,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]): psser = self._fillna(value=value, method=method, axis=axis, limit=limit) if method is not None: + warnings.warn( + "Series.fillna with 'method' is deprecated and will raise in a future version. " + "Use Series.ffill() or Series.bfill() instead.", + FutureWarning, + ) psser = DataFrame(psser._psdf._internal.resolved_copy)._psser_for(self._column_label) inplace = validate_bool_kwarg(inplace, "inplace") @@ -2690,6 +2702,8 @@ class Series(Frame, IndexOpsMixin, Generic[T]): When having a Series with dates as index, this function can select the last few elements based on a date offset. + .. deprecated:: 4.0.0 + Parameters ---------- offset : str or DateOffset @@ -2728,6 +2742,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]): 3 observed days in the dataset, and therefore data for 2018-04-11 was not returned. """ + warnings.warn( + "last is deprecated and will be removed in a future version. " + "Please create a mask and filter using `.loc` instead", + FutureWarning, + ) return first_series(self.to_frame().last(offset)).rename(self.name) def first(self, offset: Union[str, DateOffset]) -> "Series": @@ -2737,6 +2756,8 @@ class Series(Frame, IndexOpsMixin, Generic[T]): When having a Series with dates as index, this function can select the first few elements based on a date offset. + .. deprecated:: 4.0.0 + Parameters ---------- offset : str or DateOffset @@ -2775,6 +2796,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]): 3 observed days in the dataset, and therefore data for 2018-04-13 was not returned. """ + warnings.warn( + "first is deprecated and will be removed in a future version. " + "Please create a mask and filter using `.loc` instead", + FutureWarning, + ) return first_series(self.to_frame().first(offset)).rename(self.name) # TODO: Categorical type isn't supported (due to PySpark's limitation) and @@ -3120,6 +3146,8 @@ class Series(Frame, IndexOpsMixin, Generic[T]): """ Interchange axes and swap values axes appropriately. + .. deprecated:: 4.0.0 + Parameters ---------- i: {0 or 'index', 1 or 'columns'}. The axis to swap. @@ -3145,6 +3173,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]): z 3 dtype: int64 """ + warnings.warn( + "'Series.swapaxes' is deprecated and will be removed in a future version. " + "Please use 'Series.transpose' instead.", + FutureWarning, + ) assert copy is True i = validate_axis(i) @@ -4371,6 +4404,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]): if results[0][0] is None: # This will only happen when skipna is False because we will # place nulls first. + warnings.warn( + "The behavior of Series.idxmax with all-NA values, or any-NA and skipna=False, " + "is deprecated. In a future version this will raise ValueError", + FutureWarning, + ) return np.nan values = list(results[0][1:]) if len(values) == 1: @@ -4479,6 +4517,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]): if results[0][0] is None: # This will only happen when skipna is False because we will # place nulls first. + warnings.warn( + "The behavior of Series.idxmin with all-NA values, or any-NA and skipna=False, " + "is deprecated. In a future version this will raise ValueError", + FutureWarning, + ) return np.nan values = list(results[0][1:]) if len(values) == 1: @@ -4805,7 +4848,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]): """ return self.index - # TODO: introduce 'method', 'limit', 'in_place'; fully support 'regex' + # TODO: introduce 'in_place'; fully support 'regex' def replace( self, to_replace: Optional[Union[Any, List, Tuple, Dict]] = None, @@ -6185,6 +6228,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]): 10 10 dtype: int64 """ + warnings.warn( + "The behavior of Series.argsort in the presence of NA values is deprecated. " + "In a future version, NA values will be ordered last instead of set to -1.", + FutureWarning, + ) notnull = self.loc[self.notnull()] sdf_for_index = notnull._internal.spark_frame.select(notnull._internal.index_spark_columns) @@ -6290,9 +6338,6 @@ class Series(Frame, IndexOpsMixin, Generic[T]): >>> s.argmax() 3 - - >>> s.argmax(skipna=False) - -1 """ axis = validate_axis(axis, none_axis=0) if axis == 1: @@ -6317,7 +6362,16 @@ class Series(Frame, IndexOpsMixin, Generic[T]): else: max_value = results[0] # If the maximum is achieved in multiple locations, the first row position is returned. - return -1 if max_value[0] is None else max_value[1] + if max_value[0] is None: + warnings.warn( + "The behavior of Series.argmax/argmin " + "with skipna=False and NAs, or with all-NAs is deprecated. " + "In a future version this will raise ValueError.", + FutureWarning, + ) + return -1 + else: + return max_value[1] def argmin(self, axis: Axis = None, skipna: bool = True) -> int: """ @@ -6377,7 +6431,16 @@ class Series(Frame, IndexOpsMixin, Generic[T]): else: min_value = results[0] # If the maximum is achieved in multiple locations, the first row position is returned. - return -1 if min_value[0] is None else min_value[1] + if min_value[0] is None: + warnings.warn( + "The behavior of Series.argmax/argmin " + "with skipna=False and NAs, or with all-NAs is deprecated. " + "In a future version this will raise ValueError.", + FutureWarning, + ) + return -1 + else: + return min_value[1] def compare( self, other: "Series", keep_shape: bool = False, keep_equal: bool = False @@ -7156,6 +7219,14 @@ class Series(Frame, IndexOpsMixin, Generic[T]): ) def __getitem__(self, key: Any) -> Any: + if type(key) == int and not isinstance(self.index.spark.data_type, (IntegerType, LongType)): + warnings.warn( + "Series.__getitem__ treating keys as positions is deprecated. " + "In a future version, integer keys will always be treated as labels " + "(consistent with DataFrame behavior). " + "To access a value by position, use `ser.iloc[pos]`", + FutureWarning, + ) try: if (isinstance(key, slice) and any(type(n) == int for n in [key.start, key.stop])) or ( type(key) == int diff --git a/python/pyspark/pandas/supported_api_gen.py b/python/pyspark/pandas/supported_api_gen.py index 06591c5b26a..dfcd4267b41 100644 --- a/python/pyspark/pandas/supported_api_gen.py +++ b/python/pyspark/pandas/supported_api_gen.py @@ -98,7 +98,7 @@ def generate_supported_api(output_rst_file_path: str) -> None: Write supported APIs documentation. """ - pandas_latest_version = "2.0.3" + pandas_latest_version = "2.1.0" if LooseVersion(pd.__version__) != LooseVersion(pandas_latest_version): msg = ( "Warning: Latest version of pandas (%s) is required to generate the documentation; " diff --git a/python/pyspark/pandas/tests/computation/test_corrwith.py b/python/pyspark/pandas/tests/computation/test_corrwith.py index cf25f39b83d..4db61c15854 100644 --- a/python/pyspark/pandas/tests/computation/test_corrwith.py +++ b/python/pyspark/pandas/tests/computation/test_corrwith.py @@ -59,10 +59,7 @@ class FrameCorrwithMixin: # Therefore, we only test the pandas 1.5.0 in different way. # See https://github.com/pandas-dev/pandas/issues/48826 for the reported issue, # and https://github.com/pandas-dev/pandas/pull/46174 for the initial PR that causes. - if LooseVersion(pd.__version__) == LooseVersion("1.5.0") and isinstance(pobj, pd.Series): - methods = ["kendall"] - else: - methods = ["pearson", "spearman", "kendall"] + methods = ["pearson", "spearman", "kendall"] for method in methods: for drop in [True, False]: p_corr = pdf.corrwith(pobj, drop=drop, method=method) diff --git a/python/pyspark/pandas/tests/test_stats.py b/python/pyspark/pandas/tests/test_stats.py index be5340dafdc..ad5dc1fb8d8 100644 --- a/python/pyspark/pandas/tests/test_stats.py +++ b/python/pyspark/pandas/tests/test_stats.py @@ -20,11 +20,6 @@ from distutils.version import LooseVersion import numpy as np import pandas as pd -try: - from pandas._testing import makeMissingDataframe -except ImportError: - from pandas.util.testing import makeMissingDataframe - from pyspark import pandas as ps from pyspark.pandas.config import option_context from pyspark.testing.pandasutils import PandasOnSparkTestCase, SPARK_CONF_ARROW_ENABLED @@ -273,7 +268,18 @@ class StatsTestsMixin: self.assert_eq(psdf.kurt(), pdf.kurt(), almost=True) def test_dataframe_corr(self): - pdf = makeMissingDataframe(0.3, 42) + pdf = pd.DataFrame( + index=[ + "".join( + np.random.choice( + list("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"), 10 + ) + ) + for _ in range(30) + ], + columns=list("ABCD"), + dtype="float64", + ) psdf = ps.from_pandas(pdf) with self.assertRaisesRegex(ValueError, "Invalid method"): @@ -347,7 +353,18 @@ class StatsTestsMixin: ) def test_series_corr(self): - pdf = makeMissingDataframe(0.3, 42) + pdf = pd.DataFrame( + index=[ + "".join( + np.random.choice( + list("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"), 10 + ) + ) + for _ in range(30) + ], + columns=list("ABCD"), + dtype="float64", + ) pser1 = pdf.A pser2 = pdf.B psdf = ps.from_pandas(pdf) diff --git a/python/pyspark/pandas/typedef/typehints.py b/python/pyspark/pandas/typedef/typehints.py index e66b08b9f0b..57bfd7fcd83 100644 --- a/python/pyspark/pandas/typedef/typehints.py +++ b/python/pyspark/pandas/typedef/typehints.py @@ -488,7 +488,7 @@ def infer_return_type(f: Callable) -> Union[SeriesType, DataFrameType, ScalarTyp ... pass >>> inferred = infer_return_type(func) >>> inferred.dtypes - [dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False)] + [dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False, categories_dtype=int64)] >>> inferred.spark_type StructType([StructField('c0', LongType(), True), StructField('c1', LongType(), True)]) @@ -496,7 +496,7 @@ def infer_return_type(f: Callable) -> Union[SeriesType, DataFrameType, ScalarTyp ... pass >>> inferred = infer_return_type(func) >>> inferred.dtypes - [dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False)] + [dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False, categories_dtype=int64)] >>> inferred.spark_type StructType([StructField('a', LongType(), True), StructField('b', LongType(), True)]) @@ -504,7 +504,7 @@ def infer_return_type(f: Callable) -> Union[SeriesType, DataFrameType, ScalarTyp ... pass >>> inferred = infer_return_type(func) >>> inferred.dtype - CategoricalDtype(categories=[3, 4, 5], ordered=False) + CategoricalDtype(categories=[3, 4, 5], ordered=False, categories_dtype=int64) >>> inferred.spark_type LongType() @@ -522,7 +522,8 @@ def infer_return_type(f: Callable) -> Union[SeriesType, DataFrameType, ScalarTyp ... pass >>> inferred = infer_return_type(func) >>> inferred.dtypes - [dtype('int64'), dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False)] + [dtype('int64'), dtype('int64'), + CategoricalDtype(categories=[3, 4, 5], ordered=False, categories_dtype=int64)] >>> inferred.spark_type.simpleString() 'struct<__index_level_0__:bigint,c0:bigint,c1:bigint>' >>> inferred.index_fields @@ -534,7 +535,8 @@ def infer_return_type(f: Callable) -> Union[SeriesType, DataFrameType, ScalarTyp ... pass >>> inferred = infer_return_type(func) >>> inferred.dtypes - [CategoricalDtype(categories=[3, 4, 5], ordered=False), dtype('int64'), dtype('int64')] + [CategoricalDtype(categories=[3, 4, 5], ordered=False, categories_dtype=int64), + dtype('int64'), dtype('int64')] >>> inferred.spark_type.simpleString() 'struct<index:bigint,id:bigint,A:bigint>' >>> inferred.index_fields @@ -545,7 +547,8 @@ def infer_return_type(f: Callable) -> Union[SeriesType, DataFrameType, ScalarTyp ... pass >>> inferred = infer_return_type(func) >>> inferred.dtypes - [dtype('int64'), dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False)] + [dtype('int64'), dtype('int64'), + CategoricalDtype(categories=[3, 4, 5], ordered=False, categories_dtype=int64)] >>> inferred.spark_type.simpleString() 'struct<__index_level_0__:bigint,a:bigint,b:bigint>' >>> inferred.index_fields diff --git a/python/pyspark/pandas/window.py b/python/pyspark/pandas/window.py index cc793f0ac6e..db98867674a 100644 --- a/python/pyspark/pandas/window.py +++ b/python/pyspark/pandas/window.py @@ -587,6 +587,10 @@ class Rolling(RollingLike[FrameLike]): ---------- quantile : float Value between 0 and 1 providing the quantile to compute. + + .. deprecated:: 4.0.0 + This will be renamed to ‘q’ in a future version. + accuracy : int, optional Default accuracy of approximation. Larger value means better accuracy. The relative error can be deduced by 1.0 / accuracy. diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index 025eed46647..7582fe86ff2 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -45,7 +45,6 @@ import pandas as pd import pyarrow as pa from pandas.api.types import ( # type: ignore[attr-defined] is_datetime64_dtype, - is_datetime64tz_dtype, is_timedelta64_dtype, ) import urllib @@ -419,7 +418,7 @@ class SparkSession: # Any timestamps must be coerced to be compatible with Spark spark_types = [ TimestampType() - if is_datetime64_dtype(t) or is_datetime64tz_dtype(t) + if is_datetime64_dtype(t) or isinstance(t, pd.DatetimeTZDtype) else DayTimeIntervalType() if is_timedelta64_dtype(t) else None diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index f37d50f57ab..abbc9f9441f 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -384,7 +384,6 @@ class SparkConversionMixin: list list of records """ - import pandas as pd from pyspark.sql import SparkSession assert isinstance(self, SparkSession) @@ -394,7 +393,8 @@ class SparkConversionMixin: _check_series_convert_timestamps_tz_local, _get_local_timezone, ) - from pandas.core.dtypes.common import is_datetime64tz_dtype, is_timedelta64_dtype + import pandas as pd + from pandas.core.dtypes.common import is_timedelta64_dtype copied = False if isinstance(schema, StructType): @@ -493,7 +493,11 @@ class SparkConversionMixin: should_localize = not is_timestamp_ntz_preferred() for column, series in pdf.items(): s = series - if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None: + if ( + should_localize + and isinstance(s.dtype, pd.DatetimeTZDtype) + and s.dt.tz is not None + ): s = _check_series_convert_timestamps_tz_local(series, timezone) if s is not series: if not copied: @@ -590,9 +594,9 @@ class SparkConversionMixin: require_minimum_pandas_version() require_minimum_pyarrow_version() + import pandas as pd from pandas.api.types import ( # type: ignore[attr-defined] is_datetime64_dtype, - is_datetime64tz_dtype, ) import pyarrow as pa @@ -618,7 +622,9 @@ class SparkConversionMixin: else: # Any timestamps must be coerced to be compatible with Spark spark_types = [ - TimestampType() if is_datetime64_dtype(t) or is_datetime64tz_dtype(t) else None + TimestampType() + if is_datetime64_dtype(t) or isinstance(t, pd.DatetimeTZDtype) + else None for t in pdf.dtypes ] diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py index ed5884879ce..9aa2be96add 100644 --- a/python/pyspark/sql/pandas/serializers.py +++ b/python/pyspark/sql/pandas/serializers.py @@ -234,9 +234,9 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): pyarrow.Array """ import pyarrow as pa - from pandas.api.types import is_categorical_dtype + import pandas as pd - if is_categorical_dtype(series.dtype): + if isinstance(series.dtype, pd.CategoricalDtype): series = series.astype(series.dtypes.categories.dtype) if arrow_type is not None: @@ -589,9 +589,9 @@ class ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer): pyarrow.Array """ import pyarrow as pa - from pandas.api.types import is_categorical_dtype + import pandas as pd - if is_categorical_dtype(series.dtype): + if isinstance(series.dtype, pd.CategoricalDtype): series = series.astype(series.dtypes.categories.dtype) if arrow_type is not None: diff --git a/python/pyspark/sql/pandas/types.py b/python/pyspark/sql/pandas/types.py index be07f6e50e1..f4005a47357 100644 --- a/python/pyspark/sql/pandas/types.py +++ b/python/pyspark/sql/pandas/types.py @@ -246,11 +246,11 @@ def _check_series_localize_timestamps(s: "PandasSeriesLike", timezone: str) -> " require_minimum_pandas_version() - from pandas.api.types import is_datetime64tz_dtype # type: ignore[attr-defined] + import pandas as pd tz = timezone or _get_local_timezone() # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if is_datetime64tz_dtype(s.dtype): + if isinstance(s.dtype, pd.DatetimeTZDtype): return s.dt.tz_convert(tz).dt.tz_localize(None) else: return s @@ -278,9 +278,9 @@ def _check_series_convert_timestamps_internal( require_minimum_pandas_version() + import pandas as pd from pandas.api.types import ( # type: ignore[attr-defined] is_datetime64_dtype, - is_datetime64tz_dtype, ) # TODO: handle nested timestamps, such as ArrayType(TimestampType())? @@ -317,7 +317,7 @@ def _check_series_convert_timestamps_internal( # '2015-11-01 01:30:00-05:00' tz = timezone or _get_local_timezone() return s.dt.tz_localize(tz, ambiguous=False).dt.tz_convert("UTC") - elif is_datetime64tz_dtype(s.dtype): + elif isinstance(s.dtype, pd.DatetimeTZDtype): return s.dt.tz_convert("UTC") else: return s @@ -348,14 +348,13 @@ def _check_series_convert_timestamps_localize( import pandas as pd from pandas.api.types import ( # type: ignore[attr-defined] - is_datetime64tz_dtype, is_datetime64_dtype, ) from_tz = from_timezone or _get_local_timezone() to_tz = to_timezone or _get_local_timezone() # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if is_datetime64tz_dtype(s.dtype): + if isinstance(s.dtype, pd.DatetimeTZDtype): return s.dt.tz_convert(to_tz).dt.tz_localize(None) elif is_datetime64_dtype(s.dtype) and from_tz != to_tz: # `s.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT. @@ -509,7 +508,6 @@ def _create_converter_to_pandas( """ import numpy as np import pandas as pd - from pandas.core.dtypes.common import is_datetime64tz_dtype pandas_type = _to_corrected_pandas_type(data_type) @@ -539,7 +537,7 @@ def _create_converter_to_pandas( assert timezone is not None def correct_dtype(pser: pd.Series) -> pd.Series: - if not is_datetime64tz_dtype(pser.dtype): + if not isinstance(pser.dtype, pd.DatetimeTZDtype): pser = pser.astype(pandas_type, copy=False) return _check_series_convert_timestamps_local_tz(pser, timezone=cast(str, timezone)) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py index b867156e71a..d5b2cf61715 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py @@ -166,7 +166,7 @@ class CogroupedApplyInPandasTestsMixin: fn=lambda lft, rgt: lft.size + rgt.size, error_class=PythonException, error_message_regex="Return type of the user-defined function " - "should be pandas.DataFrame, but is int64.", + "should be pandas.DataFrame, but is int.", ) def test_apply_in_pandas_returning_column_names(self): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org