This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f83e5ec202b [SPARK-45065][PYTHON][PS] Support Pandas 2.1.0
f83e5ec202b is described below

commit f83e5ec202be68d6640f9f9403d96a39ef993f82
Author: Haejoon Lee <haejoon....@databricks.com>
AuthorDate: Mon Sep 18 16:27:09 2023 -0700

    [SPARK-45065][PYTHON][PS] Support Pandas 2.1.0
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to support pandas 2.1.0 for PySpark. See [What's new in 
2.1.0](https://pandas.pydata.org/docs/dev/whatsnew/v2.1.0.html) for more detail.
    
    ### Why are the changes needed?
    
    We should follow the latest version of pandas.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    The existing CI should passed with Pandas 2.1.0
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42793 from itholic/pandas_2.1.0.
    
    Lead-authored-by: Haejoon Lee <haejoon....@databricks.com>
    Co-authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 dev/infra/Dockerfile                               |   4 +-
 .../source/migration_guide/pyspark_upgrade.rst     |   3 +
 .../docs/source/reference/pyspark.pandas/frame.rst |   1 +
 python/pyspark/pandas/base.py                      |   2 -
 python/pyspark/pandas/frame.py                     | 143 +++++++++++++++++++--
 python/pyspark/pandas/generic.py                   |  42 ++++++
 python/pyspark/pandas/groupby.py                   |  50 ++++++-
 python/pyspark/pandas/indexes/base.py              |   6 +-
 python/pyspark/pandas/indexes/datetimes.py         |  18 +++
 python/pyspark/pandas/indexes/timedelta.py         |   7 +
 python/pyspark/pandas/namespace.py                 |  21 ++-
 python/pyspark/pandas/plot/matplotlib.py           |   2 +-
 python/pyspark/pandas/series.py                    |  83 +++++++++++-
 python/pyspark/pandas/supported_api_gen.py         |   2 +-
 .../pandas/tests/computation/test_corrwith.py      |   5 +-
 python/pyspark/pandas/tests/test_stats.py          |  31 ++++-
 python/pyspark/pandas/typedef/typehints.py         |  15 ++-
 python/pyspark/pandas/window.py                    |   4 +
 python/pyspark/sql/connect/session.py              |   3 +-
 python/pyspark/sql/pandas/conversion.py            |  16 ++-
 python/pyspark/sql/pandas/serializers.py           |   8 +-
 python/pyspark/sql/pandas/types.py                 |  14 +-
 .../sql/tests/pandas/test_pandas_cogrouped_map.py  |   2 +-
 23 files changed, 415 insertions(+), 67 deletions(-)

diff --git a/dev/infra/Dockerfile b/dev/infra/Dockerfile
index 8e5a3cb7c05..d196d0e97c5 100644
--- a/dev/infra/Dockerfile
+++ b/dev/infra/Dockerfile
@@ -84,8 +84,8 @@ RUN Rscript -e "devtools::install_version('roxygen2', 
version='7.2.0', repos='ht
 # See more in SPARK-39735
 ENV R_LIBS_SITE 
"/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library"
 
-RUN pypy3 -m pip install numpy 'pandas<=2.0.3' scipy coverage matplotlib
-RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.0.3' scipy 
unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 
'memory-profiler==0.60.0' 'scikit-learn==1.1.*'
+RUN pypy3 -m pip install numpy 'pandas<=2.1.0' scipy coverage matplotlib
+RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.1.0' scipy 
unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 
'memory-profiler==0.60.0' 'scikit-learn==1.1.*'
 
 # Add Python deps for Spark Connect.
 RUN python3.9 -m pip install 'grpcio>=1.48,<1.57' 'grpcio-status>=1.48,<1.57' 
'protobuf==3.20.3' 'googleapis-common-protos==1.56.4'
diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst 
b/python/docs/source/migration_guide/pyspark_upgrade.rst
index 3513f0a878e..d081275dc83 100644
--- a/python/docs/source/migration_guide/pyspark_upgrade.rst
+++ b/python/docs/source/migration_guide/pyspark_upgrade.rst
@@ -22,6 +22,7 @@ Upgrading PySpark
 Upgrading from PySpark 3.5 to 4.0
 ---------------------------------
 
+* In Spark 4.0, it is recommended to use Pandas version 2.0.0 or above with 
PySpark for optimal compatibility.
 * In Spark 4.0, the minimum supported version for Pandas has been raised from 
1.0.5 to 1.4.4 in PySpark.
 * In Spark 4.0, the minimum supported version for Numpy has been raised from 
1.15 to 1.21 in PySpark.
 * In Spark 4.0, ``Int64Index`` and ``Float64Index`` have been removed from 
pandas API on Spark, ``Index`` should be used directly.
@@ -44,6 +45,8 @@ Upgrading from PySpark 3.5 to 4.0
 * In Spark 4.0, ``squeeze`` parameter from ``ps.read_csv`` and 
``ps.read_excel`` has been removed from pandas API on Spark.
 * In Spark 4.0, ``null_counts`` parameter from ``DataFrame.info`` has been 
removed from pandas API on Spark, use ``show_counts`` instead.
 * In Spark 4.0, the result of ``MultiIndex.append`` does not keep the index 
names from pandas API on Spark.
+* In Spark 4.0, ``DataFrameGroupBy.agg`` with lists respecting 
``as_index=False`` from pandas API on Spark.
+* In Spark 4.0, ``DataFrame.stack`` guarantees the order of existing columns 
instead of sorting them lexicographically from pandas API on Spark.
 * In Spark 4.0, ``True`` or ``False`` to ``inclusive`` parameter from 
``Series.between`` has been removed from pandas API on Spark, use ``both`` or 
``neither`` instead respectively.
 * In Spark 4.0, ``Index.asi8`` has been removed from pandas API on Spark, use 
``Index.astype`` instead.
 * In Spark 4.0, ``Index.is_type_compatible`` has been removed from pandas API 
on Spark, use ``Index.isin`` instead.
diff --git a/python/docs/source/reference/pyspark.pandas/frame.rst 
b/python/docs/source/reference/pyspark.pandas/frame.rst
index 5f839a803d7..a22078f86e2 100644
--- a/python/docs/source/reference/pyspark.pandas/frame.rst
+++ b/python/docs/source/reference/pyspark.pandas/frame.rst
@@ -127,6 +127,7 @@ Function application, GroupBy & Window
 
    DataFrame.apply
    DataFrame.applymap
+   DataFrame.map
    DataFrame.pipe
    DataFrame.agg
    DataFrame.aggregate
diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py
index ef0b51f757d..ed6e983fdc8 100644
--- a/python/pyspark/pandas/base.py
+++ b/python/pyspark/pandas/base.py
@@ -978,7 +978,6 @@ class IndexOpsMixin(object, metaclass=ABCMeta):
         NA values, such as None or numpy.NaN, get mapped to True values.
         Everything else gets mapped to False values. Characters such as empty 
strings '' or
         numpy.inf are not considered NA values
-        (unless you set pandas.options.mode.use_inf_as_na = True).
 
         Returns
         -------
@@ -1012,7 +1011,6 @@ class IndexOpsMixin(object, metaclass=ABCMeta):
         Return a boolean same-sized object indicating if the values are not NA.
         Non-missing values get mapped to True.
         Characters such as empty strings '' or numpy.inf are not considered NA 
values
-        (unless you set pandas.options.mode.use_inf_as_na = True).
         NA values, such as None or numpy.NaN, get mapped to False values.
 
         Returns
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index b19f55e7eba..26cb15417f5 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -1271,6 +1271,8 @@ class DataFrame(Frame, Generic[T]):
         This method applies a function that accepts and returns a scalar
         to every element of a DataFrame.
 
+        .. deprecated:: 4.0.0
+
         .. note:: this API executes the function once to infer the type which 
is
              potentially expensive, for instance, when the dataset is created 
after
              aggregations or sorting.
@@ -1321,7 +1323,74 @@ class DataFrame(Frame, Generic[T]):
         0   1.000000   4.494400
         1  11.262736  20.857489
         """
+        warnings.warn(
+            "DataFrame.applymap has been deprecated. Use DataFrame.map 
instead", FutureWarning
+        )
+
+        # TODO: We can implement shortcut theoretically since it creates new 
DataFrame
+        #  anyway and we don't have to worry about operations on different 
DataFrames.
+        return self.map(func=func)
+
+    def map(self, func: Callable[[Any], Any]) -> "DataFrame":
+        """
+        Apply a function to a Dataframe elementwise.
+
+        This method applies a function that accepts and returns a scalar
+        to every element of a DataFrame.
+
+        .. versionadded:: 4.0.0
+            DataFrame.applymap was deprecated and renamed to DataFrame.map.
+
+        .. note:: this API executes the function once to infer the type which 
is
+             potentially expensive, for instance, when the dataset is created 
after
+             aggregations or sorting.
+
+             To avoid this, specify return type in ``func``, for instance, as 
below:
+
+             >>> def square(x) -> np.int32:
+             ...     return x ** 2
+
+             pandas-on-Spark uses return type hints and does not try to infer 
the type.
+
+        Parameters
+        ----------
+        func : callable
+            Python function returns a single value from a single value.
+
+        Returns
+        -------
+        DataFrame
+            Transformed DataFrame.
+
+        Examples
+        --------
+        >>> df = ps.DataFrame([[1, 2.12], [3.356, 4.567]])
+        >>> df
+               0      1
+        0  1.000  2.120
+        1  3.356  4.567
 
+        >>> def str_len(x) -> int:
+        ...     return len(str(x))
+        >>> df.map(str_len)
+           0  1
+        0  3  4
+        1  5  5
+
+        >>> def power(x) -> float:
+        ...     return x ** 2
+        >>> df.map(power)
+                   0          1
+        0   1.000000   4.494400
+        1  11.262736  20.857489
+
+        You can omit type hints and let pandas-on-Spark infer its type.
+
+        >>> df.map(lambda x: x ** 2)
+                   0          1
+        0   1.000000   4.494400
+        1  11.262736  20.857489
+        """
         # TODO: We can implement shortcut theoretically since it creates new 
DataFrame
         #  anyway and we don't have to worry about operations on different 
DataFrames.
         return self._apply_series_op(lambda psser: psser.apply(func))
@@ -5556,6 +5625,10 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         Parameters
         ----------
         data : ndarray (structured dtype), list of tuples, dict, or DataFrame
+
+            .. deprecated:: 4.0.0
+                Passing a DataFrame is deprecated.
+
         index : string, list of fields, array-like
             Field of array to use as the index, alternately a specific set of 
input labels to use
         exclude : sequence, default None
@@ -5952,6 +6025,9 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
             Method to use for filling holes in reindexed Series pad / ffill: 
propagate last valid
             observation forward to next valid backfill / bfill:
             use NEXT valid observation to fill gap
+
+            .. deprecated:: 4.0.0
+
         axis : {0 or `index`}
             1 and `columns` are not supported.
         inplace : boolean, default False
@@ -5963,6 +6039,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
             this is the maximum number of entries along the entire axis where 
NaNs will be filled.
             Must be greater than 0 if not None
 
+            .. deprecated:: 4.0.0
+
         Returns
         -------
         DataFrame
@@ -6046,6 +6124,11 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
                     return psser._fillna(value=value, method=method, 
axis=axis, limit=limit)
 
         elif method is not None:
+            warnings.warn(
+                "DataFrame.fillna with 'method' is deprecated and will raise 
in a future version. "
+                "Use DataFrame.ffill() or DataFrame.bfill() instead.",
+                FutureWarning,
+            )
 
             def op(psser: ps.Series) -> ps.Series:
                 return psser._fillna(value=value, method=method, axis=axis, 
limit=limit)
@@ -6121,6 +6204,21 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
             If value is a list or tuple, value should be of the same length 
with to_replace.
         inplace : boolean, default False
             Fill in place (do not create a new object)
+        limit : int, default None
+            Maximum size gap to forward or backward fill.
+
+            .. deprecated:: 4.0.0
+
+        regex : bool or str, default False
+            Whether to interpret to_replace and/or value as regular 
expressions.
+            If this is True then to_replace must be a string.
+            Alternatively, this could be a regular expression in which case 
to_replace must be None.
+        method : 'pad', default None
+            The method to use when for replacement, when to_replace is a 
scalar,
+            list or tuple and value is None.
+
+            .. deprecated:: 4.0.0
+
 
         Returns
         -------
@@ -6189,8 +6287,18 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         3     Hulk        Smash
         """
         if method != "pad":
+            warnings.warn(
+                "The 'method' keyword in DataFrame.replace is deprecated "
+                "and will be removed in a future version.",
+                FutureWarning,
+            )
             raise NotImplementedError("replace currently works only for 
method='pad")
         if limit is not None:
+            warnings.warn(
+                "The 'limit' keyword in DataFrame.replace is deprecated "
+                "and will be removed in a future version.",
+                FutureWarning,
+            )
             raise NotImplementedError("replace currently works only when 
limit=None")
         if regex is not False:
             raise NotImplementedError("replace currently doesn't supports 
regex")
@@ -6221,6 +6329,13 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
                     return psser
 
         else:
+            if value is None:
+                warnings.warn(
+                    "DataFrame.replace without 'value' and with non-dict-like 
'to_replace' "
+                    "is deprecated and will raise in a future version. "
+                    "Explicitly specify the new values instead.",
+                    FutureWarning,
+                )
 
             def op(psser: ps.Series) -> ps.Series:
                 return psser.replace(to_replace=to_replace, value=value, 
regex=regex)
@@ -6344,6 +6459,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         When having a DataFrame with dates as index, this function can
         select the last few rows based on a date offset.
 
+        .. deprecated:: 4.0.0
+
         Parameters
         ----------
         offset : str or DateOffset
@@ -6383,6 +6500,11 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         3 observed days in the dataset, and therefore data for 2018-04-11 was
         not returned.
         """
+        warnings.warn(
+            "last is deprecated and will be removed in a future version. "
+            "Please create a mask and filter using `.loc` instead",
+            FutureWarning,
+        )
         # Check index type should be format DateTime
         if not isinstance(self.index, ps.DatetimeIndex):
             raise TypeError("'last' only supports a DatetimeIndex")
@@ -6401,6 +6523,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         When having a DataFrame with dates as index, this function can
         select the first few rows based on a date offset.
 
+        .. deprecated:: 4.0.0
+
         Parameters
         ----------
         offset : str or DateOffset
@@ -6440,6 +6564,11 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         3 observed days in the dataset, and therefore data for 2018-04-13 was
         not returned.
         """
+        warnings.warn(
+            "first is deprecated and will be removed in a future version. "
+            "Please create a mask and filter using `.loc` instead",
+            FutureWarning,
+        )
         # Check index type should be format DatetimeIndex
         if not isinstance(self.index, ps.DatetimeIndex):
             raise TypeError("'first' only supports a DatetimeIndex")
@@ -10527,12 +10656,12 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
                 kg      m
         cat    1.0    2.0
         dog    3.0    4.0
-        >>> df_multi_level_cols2.stack().sort_index()  # doctest: +SKIP
-                height  weight
-        cat kg     NaN     1.0
-            m      2.0     NaN
-        dog kg     NaN     3.0
-            m      4.0     NaN
+        >>> df_multi_level_cols2.stack().sort_index()
+                weight  height
+        cat kg     1.0     NaN
+            m      NaN     2.0
+        dog kg     3.0     NaN
+            m      NaN     4.0
         """
         from pyspark.pandas.series import first_series
 
@@ -10558,8 +10687,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
 
             index_values.add(value)
 
-        column_labels = dict(sorted(column_labels.items(), key=lambda x: x[0]))
-
         index_name = self._internal.column_label_names[-1]
         column_label_names = self._internal.column_label_names[:-1]
         if len(column_label_names) == 0:
diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index b540045f88f..81f4f6db7ed 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -1317,6 +1317,13 @@ class Frame(object, metaclass=ABCMeta):
         >>> df['b'].sum(min_count=3)
         nan
         """
+        if axis is None and isinstance(self, ps.DataFrame):
+            warnings.warn(
+                "The behavior of DataFrame.sum with axis=None is deprecated, "
+                "in a future version this will reduce over both axes and 
return a scalar. "
+                "To retain the old behavior, pass axis=0 (or do not pass 
axis)",
+                FutureWarning,
+            )
         axis = validate_axis(axis)
 
         if numeric_only is None and axis == 0:
@@ -1418,6 +1425,13 @@ class Frame(object, metaclass=ABCMeta):
         >>> ps.Series([]).prod(min_count=1)  # doctest: +SKIP
         nan
         """
+        if axis is None and isinstance(self, ps.DataFrame):
+            warnings.warn(
+                "The behavior of DataFrame.product with axis=None is 
deprecated, "
+                "in a future version this will reduce over both axes and 
return a scalar. "
+                "To retain the old behavior, pass axis=0 (or do not pass 
axis)",
+                FutureWarning,
+            )
         axis = validate_axis(axis)
 
         if numeric_only is None and axis == 0:
@@ -1870,6 +1884,13 @@ class Frame(object, metaclass=ABCMeta):
         if not isinstance(ddof, int):
             raise TypeError("ddof must be integer")
 
+        if axis is None and isinstance(self, ps.DataFrame):
+            warnings.warn(
+                "The behavior of DataFrame.std with axis=None is deprecated, "
+                "in a future version this will reduce over both axes and 
return a scalar. "
+                "To retain the old behavior, pass axis=0 (or do not pass 
axis)",
+                FutureWarning,
+            )
         axis = validate_axis(axis)
 
         if numeric_only is None and axis == 0:
@@ -1962,6 +1983,13 @@ class Frame(object, metaclass=ABCMeta):
         if not isinstance(ddof, int):
             raise TypeError("ddof must be integer")
 
+        if axis is None and isinstance(self, ps.DataFrame):
+            warnings.warn(
+                "The behavior of DataFrame.var with axis=None is deprecated, "
+                "in a future version this will reduce over both axes and 
return a scalar. "
+                "To retain the old behavior, pass axis=0 (or do not pass 
axis)",
+                FutureWarning,
+            )
         axis = validate_axis(axis)
 
         if numeric_only is None and axis == 0:
@@ -2191,6 +2219,13 @@ class Frame(object, metaclass=ABCMeta):
         if not isinstance(ddof, int):
             raise TypeError("ddof must be integer")
 
+        if axis is None and isinstance(self, ps.DataFrame):
+            warnings.warn(
+                "The behavior of DataFrame.sem with axis=None is deprecated, "
+                "in a future version this will reduce over both axes and 
return a scalar. "
+                "To retain the old behavior, pass axis=0 (or do not pass 
axis)",
+                FutureWarning,
+            )
         axis = validate_axis(axis)
 
         if numeric_only is None and axis == 0:
@@ -2448,6 +2483,8 @@ class Frame(object, metaclass=ABCMeta):
         This must be a boolean scalar value, either True or False. Raise a 
ValueError if
         the object does not have exactly 1 element, or that element is not 
boolean
 
+        .. deprecated:: 4.0.0
+
         Returns
         -------
         bool
@@ -2479,6 +2516,11 @@ class Frame(object, metaclass=ABCMeta):
           ...
         ValueError: bool cannot act on a non-boolean single element DataFrame
         """
+        warnings.warn(
+            f"{self.__class__.__name__}.bool is now deprecated "
+            "and will be removed in future version.",
+            FutureWarning,
+        )
         if isinstance(self, ps.DataFrame):
             df = self
         elif isinstance(self, ps.Series):
diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index bbc292cf744..c7924fa3345 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -305,7 +305,14 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
                 i for i, gkey in enumerate(self._groupkeys) if gkey._psdf is 
not self._psdf
             )
             if len(should_drop_index) > 0:
-                psdf = psdf.reset_index(level=should_drop_index, drop=True)
+                drop = not any(
+                    [
+                        isinstance(func_or_funcs[gkey.name], list)
+                        for gkey in self._groupkeys
+                        if gkey.name in func_or_funcs
+                    ]
+                )
+                psdf = psdf.reset_index(level=should_drop_index, drop=drop)
             if len(should_drop_index) < len(self._groupkeys):
                 psdf = psdf.reset_index()
 
@@ -701,6 +708,14 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
             raise TypeError("must be real number, not %s" % type(q).__name__)
         if not 0 <= q <= 1:
             raise ValueError("'q' must be between 0 and 1. Got '%s' instead" % 
q)
+        if any(isinstance(_agg_col.spark.data_type, BooleanType) for _agg_col 
in self._agg_columns):
+            warnings.warn(
+                f"Allowing bool dtype in {self.__class__.__name__}.quantile is 
deprecated "
+                "and will raise in a future version, matching the 
Series/DataFrame behavior. "
+                "Cast to uint8 dtype before calling quantile instead.",
+                FutureWarning,
+            )
+
         return self._reduce_for_stat_function(
             lambda col: F.percentile_approx(col.cast(DoubleType()), q, 
accuracy),
             accepted_spark_types=(NumericType, BooleanType),
@@ -2293,7 +2308,6 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
             should_resolve=True,
         )
 
-    # TODO: add axis parameter
     def idxmax(self, skipna: bool = True) -> FrameLike:
         """
         Return index of first occurrence of maximum over requested axis in 
group.
@@ -2376,7 +2390,6 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
         )
         return self._handle_output(DataFrame(internal))
 
-    # TODO: add axis parameter
     def idxmin(self, skipna: bool = True) -> FrameLike:
         """
         Return index of first occurrence of minimum over requested axis in 
group.
@@ -2479,8 +2492,16 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
             Method to use for filling holes in reindexed Series pad / ffill: 
propagate last valid
             observation forward to next valid backfill / bfill:
             use NEXT valid observation to fill gap
+
+            .. deprecated:: 4.0.0
+
         axis : {0 or `index`}
             1 and `columns` are not supported.
+
+            .. deprecated:: 4.0.0
+                For axis=1, operate on the underlying object instead.
+                Otherwise the axis keyword is not necessary.
+
         inplace : boolean, default False
             Fill in place (do not create a new object)
         limit : int, default None
@@ -2490,6 +2511,9 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
             this is the maximum number of entries along the entire axis where 
NaNs will be filled.
             Must be greater than 0 if not None
 
+            .. deprecated:: 4.0.0
+
+
         Returns
         -------
         DataFrame
@@ -2527,11 +2551,19 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
         2  3.0  1.0  5
         3  3.0  1.0  4
         """
+        should_resolve = method is not None
+        if should_resolve:
+            warnings.warn(
+                "DataFrameGroupBy.fillna with 'method' is deprecated "
+                "and will raise in a future version. "
+                "Use DataFrameGroupBy.ffill() or DataFrameGroupBy.bfill() 
instead.",
+                FutureWarning,
+            )
         return self._apply_series_op(
             lambda sg: sg._psser._fillna(
                 value=value, method=method, axis=axis, limit=limit, 
part_cols=sg._groupkeys_scols
             ),
-            should_resolve=(method is not None),
+            should_resolve=should_resolve,
         )
 
     def bfill(self, limit: Optional[int] = None) -> FrameLike:
@@ -3573,6 +3605,16 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
                 )
             )
         if not self._as_index:
+            column_names = [column.name for column in self._agg_columns]
+            for groupkey in self._groupkeys:
+                if groupkey.name not in column_names:
+                    warnings.warn(
+                        "A grouping was used that is not in the columns of the 
DataFrame and so "
+                        "was excluded from the result. "
+                        "This grouping will be included in a future version. "
+                        "Add the grouping as a column of the DataFrame to 
silence this warning.",
+                        FutureWarning,
+                    )
             should_drop_index = set(
                 i for i, gkey in enumerate(self._groupkeys) if gkey._psdf is 
not self._psdf
             )
diff --git a/python/pyspark/pandas/indexes/base.py 
b/python/pyspark/pandas/indexes/base.py
index 28e98bc4cf2..48ce22b6e51 100644
--- a/python/pyspark/pandas/indexes/base.py
+++ b/python/pyspark/pandas/indexes/base.py
@@ -34,9 +34,7 @@ import pandas as pd
 import numpy as np
 from pandas.api.types import (  # type: ignore[attr-defined]
     is_list_like,
-    is_interval_dtype,
     is_bool_dtype,
-    is_categorical_dtype,
     is_integer_dtype,
     is_float_dtype,
     is_numeric_dtype,
@@ -1023,7 +1021,7 @@ class Index(IndexOpsMixin):
         >>> ps.DataFrame({'a': [1]}, index=[1]).index.is_categorical()
         False
         """
-        return is_categorical_dtype(self.dtype)
+        return isinstance(self.dtype, pd.CategoricalDtype)
 
     def is_floating(self) -> bool:
         """
@@ -1056,7 +1054,7 @@ class Index(IndexOpsMixin):
         >>> ps.DataFrame({'a': [1]}, index=[1]).index.is_interval()
         False
         """
-        return is_interval_dtype(self.dtype)
+        return isinstance(self.dtype, pd.IntervalDtype)
 
     def is_numeric(self) -> bool:
         """
diff --git a/python/pyspark/pandas/indexes/datetimes.py 
b/python/pyspark/pandas/indexes/datetimes.py
index f37661cb6a0..2c208974167 100644
--- a/python/pyspark/pandas/indexes/datetimes.py
+++ b/python/pyspark/pandas/indexes/datetimes.py
@@ -45,9 +45,15 @@ class DatetimeIndex(Index):
         inferred frequency upon creation.
     normalize : bool, default False
         Normalize start/end dates to midnight before generating date range.
+
+        .. deprecated:: 4.0.0
+
     closed : {'left', 'right'}, optional
         Set whether to include `start` and `end` that are on the
         boundary. The default includes boundary points on either end.
+
+        .. deprecated:: 4.0.0
+
     ambiguous : 'infer', bool-ndarray, 'NaT', default 'raise'
         When clocks moved backward due to DST, ambiguous times may arise.
         For example in Central European Time (UTC+01), when going from 03:00
@@ -111,6 +117,18 @@ class DatetimeIndex(Index):
         copy=False,
         name=None,
     ) -> "DatetimeIndex":
+        if closed is not None:
+            warnings.warn(
+                "The 'closed' keyword in DatetimeIndex construction is 
deprecated "
+                "and will be removed in a future version.",
+                FutureWarning,
+            )
+        if normalize is not None:
+            warnings.warn(
+                "The 'normalize' keyword in DatetimeIndex construction is 
deprecated "
+                "and will be removed in a future version.",
+                FutureWarning,
+            )
         if not is_hashable(name):
             raise TypeError("Index.name must be a hashable type")
 
diff --git a/python/pyspark/pandas/indexes/timedelta.py 
b/python/pyspark/pandas/indexes/timedelta.py
index b99c78542e7..3457ebb5bc5 100644
--- a/python/pyspark/pandas/indexes/timedelta.py
+++ b/python/pyspark/pandas/indexes/timedelta.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import warnings
 from typing import cast, no_type_check, Any
 from functools import partial
 
@@ -100,6 +101,12 @@ class TimedeltaIndex(Index):
         copy=False,
         name=None,
     ) -> "TimedeltaIndex":
+        if closed is not None:
+            warnings.warn(
+                "The 'closed' keyword in TimedeltaIndex construction is 
deprecated "
+                "and will be removed in a future version.",
+                FutureWarning,
+            )
         if not is_hashable(name):
             raise TypeError("Index.name must be a hashable type")
 
diff --git a/python/pyspark/pandas/namespace.py 
b/python/pyspark/pandas/namespace.py
index 27ff8b1d632..2f951608b72 100644
--- a/python/pyspark/pandas/namespace.py
+++ b/python/pyspark/pandas/namespace.py
@@ -43,7 +43,6 @@ import numpy as np
 import pandas as pd
 from pandas.api.types import (  # type: ignore[attr-defined]
     is_datetime64_dtype,
-    is_datetime64tz_dtype,
     is_list_like,
 )
 from pandas.tseries.offsets import DateOffset
@@ -1197,7 +1196,7 @@ def read_excel(
                 reset_index = pdf.reset_index()
                 for name, col in reset_index.items():
                     dt = col.dtype
-                    if is_datetime64_dtype(dt) or is_datetime64tz_dtype(dt):
+                    if is_datetime64_dtype(dt) or isinstance(dt, 
pd.DatetimeTZDtype):
                         continue
                     reset_index[name] = col.replace({np.nan: None})
                 pdf = reset_index
@@ -1249,6 +1248,10 @@ def read_html(
         lxml only accepts the http, FTP and file URL protocols. If you have a
         URL that starts with ``'https'`` you might try removing the ``'s'``.
 
+        .. deprecated:: 4.0.0
+            Passing html literal strings is deprecated.
+            Wrap literal string/bytes input in io.StringIO/io.BytesIO instead.
+
     match : str or compiled regular expression, optional
         The set of tables containing text matching this regex or string will be
         returned. Unless the HTML is extremely simple you will probably need to
@@ -1921,6 +1924,10 @@ def to_timedelta(
         * 'ns' / 'nanoseconds' / 'nano' / 'nanos' / 'nanosecond' / 'N'
 
         Must not be specified when `arg` context strings and 
``errors="raise"``.
+
+        .. deprecated:: 4.0.0
+            Units 'T' and 'L' are deprecated and will be removed in a future 
version.
+
     errors : {'ignore', 'raise', 'coerce'}, default 'raise'
         - If 'raise', then invalid parsing will raise an exception.
         - If 'coerce', then invalid parsing will be set as NaT.
@@ -2472,6 +2479,16 @@ def concat(
     if join not in ["inner", "outer"]:
         raise ValueError("Only can inner (intersect) or outer (union) join the 
other axis.")
 
+    if all([obj.empty for obj in objs]):
+        warnings.warn(
+            "The behavior of array concatenation with empty entries is "
+            "deprecated. In a future version, this will no longer exclude "
+            "empty items when determining the result dtype. "
+            "To retain the old behavior, exclude the empty entries before "
+            "the concat operation.",
+            FutureWarning,
+        )
+
     axis = validate_axis(axis)
     psdf: DataFrame
     if axis == 1:
diff --git a/python/pyspark/pandas/plot/matplotlib.py 
b/python/pyspark/pandas/plot/matplotlib.py
index 609ad271503..0164ec9f980 100644
--- a/python/pyspark/pandas/plot/matplotlib.py
+++ b/python/pyspark/pandas/plot/matplotlib.py
@@ -19,7 +19,7 @@ from distutils.version import LooseVersion
 
 import matplotlib as mat
 import numpy as np
-from matplotlib.axes._base import _process_plot_format
+from matplotlib.axes._base import _process_plot_format  # type: 
ignore[attr-defined]
 from pandas.core.dtypes.inference import is_list_like
 from pandas.io.formats.printing import pprint_thing
 
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index 70592155012..f1b785e1b41 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -21,6 +21,7 @@ A wrapper class for Spark Column to behave like pandas Series.
 import datetime
 import re
 import inspect
+import warnings
 from collections.abc import Mapping
 from functools import partial, reduce
 from typing import (
@@ -2074,6 +2075,9 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
             Method to use for filling holes in reindexed Series pad / ffill: 
propagate last valid
             observation forward to next valid backfill / bfill:
             use NEXT valid observation to fill gap
+
+            .. deprecated:: 4.0.0
+
         axis : {0 or `index`}
             1 and `columns` are not supported.
         inplace : boolean, default False
@@ -2085,6 +2089,9 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
             this is the maximum number of entries along the entire axis where 
NaNs will be filled.
             Must be greater than 0 if not None
 
+            .. deprecated:: 4.0.0
+
+
         Returns
         -------
         Series
@@ -2136,6 +2143,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         psser = self._fillna(value=value, method=method, axis=axis, 
limit=limit)
 
         if method is not None:
+            warnings.warn(
+                "Series.fillna with 'method' is deprecated and will raise in a 
future version. "
+                "Use Series.ffill() or Series.bfill() instead.",
+                FutureWarning,
+            )
             psser = 
DataFrame(psser._psdf._internal.resolved_copy)._psser_for(self._column_label)
 
         inplace = validate_bool_kwarg(inplace, "inplace")
@@ -2690,6 +2702,8 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         When having a Series with dates as index, this function can
         select the last few elements based on a date offset.
 
+        .. deprecated:: 4.0.0
+
         Parameters
         ----------
         offset : str or DateOffset
@@ -2728,6 +2742,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         3 observed days in the dataset, and therefore data for 2018-04-11 was
         not returned.
         """
+        warnings.warn(
+            "last is deprecated and will be removed in a future version. "
+            "Please create a mask and filter using `.loc` instead",
+            FutureWarning,
+        )
         return first_series(self.to_frame().last(offset)).rename(self.name)
 
     def first(self, offset: Union[str, DateOffset]) -> "Series":
@@ -2737,6 +2756,8 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         When having a Series with dates as index, this function can
         select the first few elements based on a date offset.
 
+        .. deprecated:: 4.0.0
+
         Parameters
         ----------
         offset : str or DateOffset
@@ -2775,6 +2796,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         3 observed days in the dataset, and therefore data for 2018-04-13 was
         not returned.
         """
+        warnings.warn(
+            "first is deprecated and will be removed in a future version. "
+            "Please create a mask and filter using `.loc` instead",
+            FutureWarning,
+        )
         return first_series(self.to_frame().first(offset)).rename(self.name)
 
     # TODO: Categorical type isn't supported (due to PySpark's limitation) and
@@ -3120,6 +3146,8 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         """
         Interchange axes and swap values axes appropriately.
 
+        .. deprecated:: 4.0.0
+
         Parameters
         ----------
         i: {0 or 'index', 1 or 'columns'}. The axis to swap.
@@ -3145,6 +3173,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         z    3
         dtype: int64
         """
+        warnings.warn(
+            "'Series.swapaxes' is deprecated and will be removed in a future 
version. "
+            "Please use 'Series.transpose' instead.",
+            FutureWarning,
+        )
         assert copy is True
 
         i = validate_axis(i)
@@ -4371,6 +4404,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         if results[0][0] is None:
             # This will only happen when skipna is False because we will
             # place nulls first.
+            warnings.warn(
+                "The behavior of Series.idxmax with all-NA values, or any-NA 
and skipna=False, "
+                "is deprecated. In a future version this will raise 
ValueError",
+                FutureWarning,
+            )
             return np.nan
         values = list(results[0][1:])
         if len(values) == 1:
@@ -4479,6 +4517,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         if results[0][0] is None:
             # This will only happen when skipna is False because we will
             # place nulls first.
+            warnings.warn(
+                "The behavior of Series.idxmin with all-NA values, or any-NA 
and skipna=False, "
+                "is deprecated. In a future version this will raise 
ValueError",
+                FutureWarning,
+            )
             return np.nan
         values = list(results[0][1:])
         if len(values) == 1:
@@ -4805,7 +4848,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         """
         return self.index
 
-    # TODO: introduce 'method', 'limit', 'in_place'; fully support 'regex'
+    # TODO: introduce 'in_place'; fully support 'regex'
     def replace(
         self,
         to_replace: Optional[Union[Any, List, Tuple, Dict]] = None,
@@ -6185,6 +6228,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         10    10
         dtype: int64
         """
+        warnings.warn(
+            "The behavior of Series.argsort in the presence of NA values is 
deprecated. "
+            "In a future version, NA values will be ordered last instead of 
set to -1.",
+            FutureWarning,
+        )
         notnull = self.loc[self.notnull()]
 
         sdf_for_index = 
notnull._internal.spark_frame.select(notnull._internal.index_spark_columns)
@@ -6290,9 +6338,6 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
 
         >>> s.argmax()
         3
-
-        >>> s.argmax(skipna=False)
-        -1
         """
         axis = validate_axis(axis, none_axis=0)
         if axis == 1:
@@ -6317,7 +6362,16 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         else:
             max_value = results[0]
             # If the maximum is achieved in multiple locations, the first row 
position is returned.
-            return -1 if max_value[0] is None else max_value[1]
+            if max_value[0] is None:
+                warnings.warn(
+                    "The behavior of Series.argmax/argmin "
+                    "with skipna=False and NAs, or with all-NAs is deprecated. 
"
+                    "In a future version this will raise ValueError.",
+                    FutureWarning,
+                )
+                return -1
+            else:
+                return max_value[1]
 
     def argmin(self, axis: Axis = None, skipna: bool = True) -> int:
         """
@@ -6377,7 +6431,16 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         else:
             min_value = results[0]
             # If the maximum is achieved in multiple locations, the first row 
position is returned.
-            return -1 if min_value[0] is None else min_value[1]
+            if min_value[0] is None:
+                warnings.warn(
+                    "The behavior of Series.argmax/argmin "
+                    "with skipna=False and NAs, or with all-NAs is deprecated. 
"
+                    "In a future version this will raise ValueError.",
+                    FutureWarning,
+                )
+                return -1
+            else:
+                return min_value[1]
 
     def compare(
         self, other: "Series", keep_shape: bool = False, keep_equal: bool = 
False
@@ -7156,6 +7219,14 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         )
 
     def __getitem__(self, key: Any) -> Any:
+        if type(key) == int and not isinstance(self.index.spark.data_type, 
(IntegerType, LongType)):
+            warnings.warn(
+                "Series.__getitem__ treating keys as positions is deprecated. "
+                "In a future version, integer keys will always be treated as 
labels "
+                "(consistent with DataFrame behavior). "
+                "To access a value by position, use `ser.iloc[pos]`",
+                FutureWarning,
+            )
         try:
             if (isinstance(key, slice) and any(type(n) == int for n in 
[key.start, key.stop])) or (
                 type(key) == int
diff --git a/python/pyspark/pandas/supported_api_gen.py 
b/python/pyspark/pandas/supported_api_gen.py
index 06591c5b26a..dfcd4267b41 100644
--- a/python/pyspark/pandas/supported_api_gen.py
+++ b/python/pyspark/pandas/supported_api_gen.py
@@ -98,7 +98,7 @@ def generate_supported_api(output_rst_file_path: str) -> None:
 
     Write supported APIs documentation.
     """
-    pandas_latest_version = "2.0.3"
+    pandas_latest_version = "2.1.0"
     if LooseVersion(pd.__version__) != LooseVersion(pandas_latest_version):
         msg = (
             "Warning: Latest version of pandas (%s) is required to generate 
the documentation; "
diff --git a/python/pyspark/pandas/tests/computation/test_corrwith.py 
b/python/pyspark/pandas/tests/computation/test_corrwith.py
index cf25f39b83d..4db61c15854 100644
--- a/python/pyspark/pandas/tests/computation/test_corrwith.py
+++ b/python/pyspark/pandas/tests/computation/test_corrwith.py
@@ -59,10 +59,7 @@ class FrameCorrwithMixin:
         # Therefore, we only test the pandas 1.5.0 in different way.
         # See https://github.com/pandas-dev/pandas/issues/48826 for the 
reported issue,
         # and https://github.com/pandas-dev/pandas/pull/46174 for the initial 
PR that causes.
-        if LooseVersion(pd.__version__) == LooseVersion("1.5.0") and 
isinstance(pobj, pd.Series):
-            methods = ["kendall"]
-        else:
-            methods = ["pearson", "spearman", "kendall"]
+        methods = ["pearson", "spearman", "kendall"]
         for method in methods:
             for drop in [True, False]:
                 p_corr = pdf.corrwith(pobj, drop=drop, method=method)
diff --git a/python/pyspark/pandas/tests/test_stats.py 
b/python/pyspark/pandas/tests/test_stats.py
index be5340dafdc..ad5dc1fb8d8 100644
--- a/python/pyspark/pandas/tests/test_stats.py
+++ b/python/pyspark/pandas/tests/test_stats.py
@@ -20,11 +20,6 @@ from distutils.version import LooseVersion
 import numpy as np
 import pandas as pd
 
-try:
-    from pandas._testing import makeMissingDataframe
-except ImportError:
-    from pandas.util.testing import makeMissingDataframe
-
 from pyspark import pandas as ps
 from pyspark.pandas.config import option_context
 from pyspark.testing.pandasutils import PandasOnSparkTestCase, 
SPARK_CONF_ARROW_ENABLED
@@ -273,7 +268,18 @@ class StatsTestsMixin:
         self.assert_eq(psdf.kurt(), pdf.kurt(), almost=True)
 
     def test_dataframe_corr(self):
-        pdf = makeMissingDataframe(0.3, 42)
+        pdf = pd.DataFrame(
+            index=[
+                "".join(
+                    np.random.choice(
+                        
list("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"), 10
+                    )
+                )
+                for _ in range(30)
+            ],
+            columns=list("ABCD"),
+            dtype="float64",
+        )
         psdf = ps.from_pandas(pdf)
 
         with self.assertRaisesRegex(ValueError, "Invalid method"):
@@ -347,7 +353,18 @@ class StatsTestsMixin:
             )
 
     def test_series_corr(self):
-        pdf = makeMissingDataframe(0.3, 42)
+        pdf = pd.DataFrame(
+            index=[
+                "".join(
+                    np.random.choice(
+                        
list("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"), 10
+                    )
+                )
+                for _ in range(30)
+            ],
+            columns=list("ABCD"),
+            dtype="float64",
+        )
         pser1 = pdf.A
         pser2 = pdf.B
         psdf = ps.from_pandas(pdf)
diff --git a/python/pyspark/pandas/typedef/typehints.py 
b/python/pyspark/pandas/typedef/typehints.py
index e66b08b9f0b..57bfd7fcd83 100644
--- a/python/pyspark/pandas/typedef/typehints.py
+++ b/python/pyspark/pandas/typedef/typehints.py
@@ -488,7 +488,7 @@ def infer_return_type(f: Callable) -> Union[SeriesType, 
DataFrameType, ScalarTyp
     ...     pass
     >>> inferred = infer_return_type(func)
     >>> inferred.dtypes
-    [dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False)]
+    [dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False, 
categories_dtype=int64)]
     >>> inferred.spark_type
     StructType([StructField('c0', LongType(), True), StructField('c1', 
LongType(), True)])
 
@@ -496,7 +496,7 @@ def infer_return_type(f: Callable) -> Union[SeriesType, 
DataFrameType, ScalarTyp
     ...     pass
     >>> inferred = infer_return_type(func)
     >>> inferred.dtypes
-    [dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False)]
+    [dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False, 
categories_dtype=int64)]
     >>> inferred.spark_type
     StructType([StructField('a', LongType(), True), StructField('b', 
LongType(), True)])
 
@@ -504,7 +504,7 @@ def infer_return_type(f: Callable) -> Union[SeriesType, 
DataFrameType, ScalarTyp
     ...     pass
     >>> inferred = infer_return_type(func)
     >>> inferred.dtype
-    CategoricalDtype(categories=[3, 4, 5], ordered=False)
+    CategoricalDtype(categories=[3, 4, 5], ordered=False, 
categories_dtype=int64)
     >>> inferred.spark_type
     LongType()
 
@@ -522,7 +522,8 @@ def infer_return_type(f: Callable) -> Union[SeriesType, 
DataFrameType, ScalarTyp
     ...     pass
     >>> inferred = infer_return_type(func)
     >>> inferred.dtypes
-    [dtype('int64'), dtype('int64'), CategoricalDtype(categories=[3, 4, 5], 
ordered=False)]
+    [dtype('int64'), dtype('int64'),
+     CategoricalDtype(categories=[3, 4, 5], ordered=False, 
categories_dtype=int64)]
     >>> inferred.spark_type.simpleString()
     'struct<__index_level_0__:bigint,c0:bigint,c1:bigint>'
     >>> inferred.index_fields
@@ -534,7 +535,8 @@ def infer_return_type(f: Callable) -> Union[SeriesType, 
DataFrameType, ScalarTyp
     ...     pass
     >>> inferred = infer_return_type(func)
     >>> inferred.dtypes
-    [CategoricalDtype(categories=[3, 4, 5], ordered=False), dtype('int64'), 
dtype('int64')]
+    [CategoricalDtype(categories=[3, 4, 5], ordered=False, 
categories_dtype=int64),
+     dtype('int64'), dtype('int64')]
     >>> inferred.spark_type.simpleString()
     'struct<index:bigint,id:bigint,A:bigint>'
     >>> inferred.index_fields
@@ -545,7 +547,8 @@ def infer_return_type(f: Callable) -> Union[SeriesType, 
DataFrameType, ScalarTyp
     ...     pass
     >>> inferred = infer_return_type(func)
     >>> inferred.dtypes
-    [dtype('int64'), dtype('int64'), CategoricalDtype(categories=[3, 4, 5], 
ordered=False)]
+    [dtype('int64'), dtype('int64'),
+     CategoricalDtype(categories=[3, 4, 5], ordered=False, 
categories_dtype=int64)]
     >>> inferred.spark_type.simpleString()
     'struct<__index_level_0__:bigint,a:bigint,b:bigint>'
     >>> inferred.index_fields
diff --git a/python/pyspark/pandas/window.py b/python/pyspark/pandas/window.py
index cc793f0ac6e..db98867674a 100644
--- a/python/pyspark/pandas/window.py
+++ b/python/pyspark/pandas/window.py
@@ -587,6 +587,10 @@ class Rolling(RollingLike[FrameLike]):
         ----------
         quantile : float
             Value between 0 and 1 providing the quantile to compute.
+
+            .. deprecated:: 4.0.0
+                This will be renamed to ‘q’ in a future version.
+
         accuracy : int, optional
             Default accuracy of approximation. Larger value means better 
accuracy.
             The relative error can be deduced by 1.0 / accuracy.
diff --git a/python/pyspark/sql/connect/session.py 
b/python/pyspark/sql/connect/session.py
index 025eed46647..7582fe86ff2 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -45,7 +45,6 @@ import pandas as pd
 import pyarrow as pa
 from pandas.api.types import (  # type: ignore[attr-defined]
     is_datetime64_dtype,
-    is_datetime64tz_dtype,
     is_timedelta64_dtype,
 )
 import urllib
@@ -419,7 +418,7 @@ class SparkSession:
                 # Any timestamps must be coerced to be compatible with Spark
                 spark_types = [
                     TimestampType()
-                    if is_datetime64_dtype(t) or is_datetime64tz_dtype(t)
+                    if is_datetime64_dtype(t) or isinstance(t, 
pd.DatetimeTZDtype)
                     else DayTimeIntervalType()
                     if is_timedelta64_dtype(t)
                     else None
diff --git a/python/pyspark/sql/pandas/conversion.py 
b/python/pyspark/sql/pandas/conversion.py
index f37d50f57ab..abbc9f9441f 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -384,7 +384,6 @@ class SparkConversionMixin:
         list
             list of records
         """
-        import pandas as pd
         from pyspark.sql import SparkSession
 
         assert isinstance(self, SparkSession)
@@ -394,7 +393,8 @@ class SparkConversionMixin:
                 _check_series_convert_timestamps_tz_local,
                 _get_local_timezone,
             )
-            from pandas.core.dtypes.common import is_datetime64tz_dtype, 
is_timedelta64_dtype
+            import pandas as pd
+            from pandas.core.dtypes.common import is_timedelta64_dtype
 
             copied = False
             if isinstance(schema, StructType):
@@ -493,7 +493,11 @@ class SparkConversionMixin:
                 should_localize = not is_timestamp_ntz_preferred()
                 for column, series in pdf.items():
                     s = series
-                    if should_localize and is_datetime64tz_dtype(s.dtype) and 
s.dt.tz is not None:
+                    if (
+                        should_localize
+                        and isinstance(s.dtype, pd.DatetimeTZDtype)
+                        and s.dt.tz is not None
+                    ):
                         s = _check_series_convert_timestamps_tz_local(series, 
timezone)
                     if s is not series:
                         if not copied:
@@ -590,9 +594,9 @@ class SparkConversionMixin:
         require_minimum_pandas_version()
         require_minimum_pyarrow_version()
 
+        import pandas as pd
         from pandas.api.types import (  # type: ignore[attr-defined]
             is_datetime64_dtype,
-            is_datetime64tz_dtype,
         )
         import pyarrow as pa
 
@@ -618,7 +622,9 @@ class SparkConversionMixin:
         else:
             # Any timestamps must be coerced to be compatible with Spark
             spark_types = [
-                TimestampType() if is_datetime64_dtype(t) or 
is_datetime64tz_dtype(t) else None
+                TimestampType()
+                if is_datetime64_dtype(t) or isinstance(t, pd.DatetimeTZDtype)
+                else None
                 for t in pdf.dtypes
             ]
 
diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index ed5884879ce..9aa2be96add 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -234,9 +234,9 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
         pyarrow.Array
         """
         import pyarrow as pa
-        from pandas.api.types import is_categorical_dtype
+        import pandas as pd
 
-        if is_categorical_dtype(series.dtype):
+        if isinstance(series.dtype, pd.CategoricalDtype):
             series = series.astype(series.dtypes.categories.dtype)
 
         if arrow_type is not None:
@@ -589,9 +589,9 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
         pyarrow.Array
         """
         import pyarrow as pa
-        from pandas.api.types import is_categorical_dtype
+        import pandas as pd
 
-        if is_categorical_dtype(series.dtype):
+        if isinstance(series.dtype, pd.CategoricalDtype):
             series = series.astype(series.dtypes.categories.dtype)
 
         if arrow_type is not None:
diff --git a/python/pyspark/sql/pandas/types.py 
b/python/pyspark/sql/pandas/types.py
index be07f6e50e1..f4005a47357 100644
--- a/python/pyspark/sql/pandas/types.py
+++ b/python/pyspark/sql/pandas/types.py
@@ -246,11 +246,11 @@ def _check_series_localize_timestamps(s: 
"PandasSeriesLike", timezone: str) -> "
 
     require_minimum_pandas_version()
 
-    from pandas.api.types import is_datetime64tz_dtype  # type: 
ignore[attr-defined]
+    import pandas as pd
 
     tz = timezone or _get_local_timezone()
     # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
-    if is_datetime64tz_dtype(s.dtype):
+    if isinstance(s.dtype, pd.DatetimeTZDtype):
         return s.dt.tz_convert(tz).dt.tz_localize(None)
     else:
         return s
@@ -278,9 +278,9 @@ def _check_series_convert_timestamps_internal(
 
     require_minimum_pandas_version()
 
+    import pandas as pd
     from pandas.api.types import (  # type: ignore[attr-defined]
         is_datetime64_dtype,
-        is_datetime64tz_dtype,
     )
 
     # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
@@ -317,7 +317,7 @@ def _check_series_convert_timestamps_internal(
         # '2015-11-01 01:30:00-05:00'
         tz = timezone or _get_local_timezone()
         return s.dt.tz_localize(tz, ambiguous=False).dt.tz_convert("UTC")
-    elif is_datetime64tz_dtype(s.dtype):
+    elif isinstance(s.dtype, pd.DatetimeTZDtype):
         return s.dt.tz_convert("UTC")
     else:
         return s
@@ -348,14 +348,13 @@ def _check_series_convert_timestamps_localize(
 
     import pandas as pd
     from pandas.api.types import (  # type: ignore[attr-defined]
-        is_datetime64tz_dtype,
         is_datetime64_dtype,
     )
 
     from_tz = from_timezone or _get_local_timezone()
     to_tz = to_timezone or _get_local_timezone()
     # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
-    if is_datetime64tz_dtype(s.dtype):
+    if isinstance(s.dtype, pd.DatetimeTZDtype):
         return s.dt.tz_convert(to_tz).dt.tz_localize(None)
     elif is_datetime64_dtype(s.dtype) and from_tz != to_tz:
         # `s.dt.tz_localize('tzlocal()')` doesn't work properly when including 
NaT.
@@ -509,7 +508,6 @@ def _create_converter_to_pandas(
     """
     import numpy as np
     import pandas as pd
-    from pandas.core.dtypes.common import is_datetime64tz_dtype
 
     pandas_type = _to_corrected_pandas_type(data_type)
 
@@ -539,7 +537,7 @@ def _create_converter_to_pandas(
             assert timezone is not None
 
             def correct_dtype(pser: pd.Series) -> pd.Series:
-                if not is_datetime64tz_dtype(pser.dtype):
+                if not isinstance(pser.dtype, pd.DatetimeTZDtype):
                     pser = pser.astype(pandas_type, copy=False)
                 return _check_series_convert_timestamps_local_tz(pser, 
timezone=cast(str, timezone))
 
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py 
b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py
index b867156e71a..d5b2cf61715 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py
@@ -166,7 +166,7 @@ class CogroupedApplyInPandasTestsMixin:
             fn=lambda lft, rgt: lft.size + rgt.size,
             error_class=PythonException,
             error_message_regex="Return type of the user-defined function "
-            "should be pandas.DataFrame, but is int64.",
+            "should be pandas.DataFrame, but is int.",
         )
 
     def test_apply_in_pandas_returning_column_names(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to