This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 103ba7bf773 [SPARK-38793][PYTHON] Support `return_indexer` parameter 
of `Index/MultiIndex.sort_values`
103ba7bf773 is described below

commit 103ba7bf773ce401eab8e31ae0a2fe67a30e57f9
Author: Xinrong Meng <xinrong.m...@databricks.com>
AuthorDate: Wed Apr 13 12:17:35 2022 +0900

    [SPARK-38793][PYTHON] Support `return_indexer` parameter of 
`Index/MultiIndex.sort_values`
    
    ### What changes were proposed in this pull request?
    Support `return_indexer` parameter of `Index/MultiIndex.sort_values`.
    
    Note that this method returns indexer as a pandas-on-Spark index while 
pandas returns it as a list. That's because indexer in pandas-on-Spark may not 
fit in memory.
    
    ### Why are the changes needed?
    To reach parity with pandas.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.  `return_indexer` parameter of `Index/MultiIndex.sort_values` is 
supported as below.
    
    Index
    ```py
            >>> idx = ps.Index([10, 100, 1, 1000])
            >>> idx
            Int64Index([10, 100, 1, 1000], dtype='int64')
    
            >>> idx.sort_values(ascending=False, return_indexer=True)
            (Int64Index([1000, 100, 10, 1], dtype='int64'), Int64Index([3, 1, 
0, 2], dtype='int64'))
    ```
    MultiIndex
    ```py
            >>> psidx = ps.MultiIndex.from_tuples([('a', 'x', 1), ('c', 'y', 
2), ('b', 'z', 3)])
            >>> psidx
            MultiIndex([('a', 'x', 1),
                        ('c', 'y', 2),
                        ('b', 'z', 3)],
                       )
    
            >>> psidx.sort_values(ascending=False, return_indexer=True)
            (MultiIndex([('c', 'y', 2),
                        ('b', 'z', 3),
                        ('a', 'x', 1)],
                       ), Int64Index([1, 2, 0], dtype='int64'))
    ```
    
    ### How was this patch tested?
    Unit test.
    
    Closes #36076 from xinrong-databricks/index.sort_values.
    
    Authored-by: Xinrong Meng <xinrong.m...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/pandas/indexes/base.py            | 53 ++++++++++++++++++++----
 python/pyspark/pandas/series.py                  |  6 +--
 python/pyspark/pandas/tests/indexes/test_base.py | 31 +++++++++++---
 3 files changed, 73 insertions(+), 17 deletions(-)

diff --git a/python/pyspark/pandas/indexes/base.py 
b/python/pyspark/pandas/indexes/base.py
index d7119f032f3..da2bb5dc9c9 100644
--- a/python/pyspark/pandas/indexes/base.py
+++ b/python/pyspark/pandas/indexes/base.py
@@ -1543,17 +1543,24 @@ class Index(IndexOpsMixin):
 
         return result
 
-    # TODO: return_indexer
-    def sort_values(self, ascending: bool = True) -> "Index":
+    def sort_values(
+        self, return_indexer: bool = False, ascending: bool = True
+    ) -> Union["Index", Tuple["Index", "Index"]]:
         """
-        Return a sorted copy of the index.
+        Return a sorted copy of the index, and optionally return the indices 
that
+        sorted the index itself.
 
         .. note:: This method is not supported for pandas when index has NaN 
value.
                   pandas raises unexpected TypeError, but we support treating 
NaN
                   as the smallest value.
+                  This method returns indexer as a pandas-on-Spark index while
+                  pandas returns it as a list. That's because indexer in 
pandas-on-Spark
+                  may not fit in memory.
 
         Parameters
         ----------
+        return_indexer : bool, default False
+            Should the indices that would sort the index be returned.
         ascending : bool, default True
             Should the index values be sorted in an ascending order.
 
@@ -1561,6 +1568,8 @@ class Index(IndexOpsMixin):
         -------
         sorted_index : ps.Index or ps.MultiIndex
             Sorted copy of the index.
+        indexer : ps.Index
+            The indices that the index itself was sorted by.
 
         See Also
         --------
@@ -1583,6 +1592,11 @@ class Index(IndexOpsMixin):
         >>> idx.sort_values(ascending=False)
         Int64Index([1000, 100, 10, 1], dtype='int64')
 
+        Sort values in descending order, and also get the indices idx was 
sorted by.
+
+        >>> idx.sort_values(ascending=False, return_indexer=True)
+        (Int64Index([1000, 100, 10, 1], dtype='int64'), Int64Index([3, 1, 0, 
2], dtype='int64'))
+
         Support for MultiIndex.
 
         >>> psidx = ps.MultiIndex.from_tuples([('a', 'x', 1), ('c', 'y', 2), 
('b', 'z', 3)])
@@ -1603,11 +1617,20 @@ class Index(IndexOpsMixin):
                     ('b', 'z', 3),
                     ('a', 'x', 1)],
                    )
+
+        >>> psidx.sort_values(ascending=False, return_indexer=True)  # 
doctest: +SKIP
+        (MultiIndex([('c', 'y', 2),
+                    ('b', 'z', 3),
+                    ('a', 'x', 1)],
+                   ), Int64Index([1, 2, 0], dtype='int64'))
         """
         sdf = self._internal.spark_frame
-        sdf = sdf.orderBy(*self._internal.index_spark_columns, 
ascending=ascending).select(
-            self._internal.index_spark_columns
-        )
+        if return_indexer:
+            sequence_col = verify_temp_column_name(sdf, 
"__distributed_sequence_column__")
+            sdf = InternalFrame.attach_distributed_sequence_column(sdf, 
column_name=sequence_col)
+
+        ordered_sdf = sdf.orderBy(*self._internal.index_spark_columns, 
ascending=ascending)
+        sdf = ordered_sdf.select(self._internal.index_spark_columns)
 
         internal = InternalFrame(
             spark_frame=sdf,
@@ -1617,7 +1640,21 @@ class Index(IndexOpsMixin):
             index_names=self._internal.index_names,
             index_fields=self._internal.index_fields,
         )
-        return DataFrame(internal).index
+        sorted_index = DataFrame(internal).index
+
+        if return_indexer:
+            alias_sequence_scol = scol_for(ordered_sdf, sequence_col).alias(
+                SPARK_DEFAULT_INDEX_NAME
+            )
+            indexer_sdf = ordered_sdf.select(alias_sequence_scol)
+            indexer_internal = InternalFrame(
+                spark_frame=indexer_sdf,
+                index_spark_columns=[scol_for(indexer_sdf, 
SPARK_DEFAULT_INDEX_NAME)],
+            )
+            indexer = DataFrame(indexer_internal).index
+            return sorted_index, indexer
+        else:
+            return sorted_index
 
     @no_type_check
     def sort(self, *args, **kwargs) -> None:
@@ -2136,7 +2173,7 @@ class Index(IndexOpsMixin):
         elif isinstance(self, type(other)) and not isinstance(self, 
MultiIndex):
             if self.name == other.name:
                 result.name = self.name
-        return result if sort is None else result.sort_values()
+        return result if sort is None else cast(Index, result.sort_values())
 
     @property
     def is_all_dates(self) -> bool:
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index 9856b59947a..ced81b12e8c 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -5304,9 +5304,9 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         dtype: int64
         """
         if not same_anchor(self, other):
-            if get_option("compute.eager_check") and not 
self.index.sort_values().equals(
-                other.index.sort_values()
-            ):
+            if get_option("compute.eager_check") and not cast(
+                ps.Index, self.index.sort_values()
+            ).equals(cast(ps.Index, other.index.sort_values())):
                 raise ValueError("matrices are not aligned")
             elif len(self.index) != len(other.index):
                 raise ValueError("matrices are not aligned")
diff --git a/python/pyspark/pandas/tests/indexes/test_base.py 
b/python/pyspark/pandas/tests/indexes/test_base.py
index de138b58c68..5379e512825 100644
--- a/python/pyspark/pandas/tests/indexes/test_base.py
+++ b/python/pyspark/pandas/tests/indexes/test_base.py
@@ -802,18 +802,38 @@ class IndexesTest(ComparisonTestBase, TestUtils):
         psidx.names = ["lv", "lv"]
         self.assertRaises(ValueError, lambda: psidx.drop(["x", "y"], 
level="lv"))
 
+    def _test_sort_values(self, pidx, psidx):
+        self.assert_eq(pidx.sort_values(), psidx.sort_values())
+        # Parameter ascending
+        self.assert_eq(pidx.sort_values(ascending=False), 
psidx.sort_values(ascending=False))
+        # Parameter return_indexer
+        p_sorted, p_indexer = pidx.sort_values(return_indexer=True)
+        ps_sorted, ps_indexer = psidx.sort_values(return_indexer=True)
+        self.assert_eq(p_sorted, ps_sorted)
+        self.assert_eq(p_indexer, ps_indexer.to_list())
+        self.assert_eq(
+            pidx.sort_values(return_indexer=False), 
psidx.sort_values(return_indexer=False)
+        )
+        # Parameter return_indexer and ascending
+        p_sorted, p_indexer = pidx.sort_values(return_indexer=True, 
ascending=False)
+        ps_sorted, ps_indexer = psidx.sort_values(return_indexer=True, 
ascending=False)
+        self.assert_eq(p_sorted, ps_sorted)
+        self.assert_eq(p_indexer, ps_indexer.to_list())
+        self.assert_eq(
+            pidx.sort_values(return_indexer=False, ascending=False),
+            psidx.sort_values(return_indexer=False, ascending=False),
+        )
+
     def test_sort_values(self):
         pidx = pd.Index([-10, -100, 200, 100])
         psidx = ps.from_pandas(pidx)
 
-        self.assert_eq(pidx.sort_values(), psidx.sort_values())
-        self.assert_eq(pidx.sort_values(ascending=False), 
psidx.sort_values(ascending=False))
+        self._test_sort_values(pidx, psidx)
 
         pidx.name = "koalas"
         psidx.name = "koalas"
 
-        self.assert_eq(pidx.sort_values(), psidx.sort_values())
-        self.assert_eq(pidx.sort_values(ascending=False), 
psidx.sort_values(ascending=False))
+        self._test_sort_values(pidx, psidx)
 
         pidx = pd.MultiIndex.from_tuples([("a", "x", 1), ("b", "y", 2), ("c", 
"z", 3)])
         psidx = ps.from_pandas(pidx)
@@ -821,8 +841,7 @@ class IndexesTest(ComparisonTestBase, TestUtils):
         pidx.names = ["hello", "koalas", "goodbye"]
         psidx.names = ["hello", "koalas", "goodbye"]
 
-        self.assert_eq(pidx.sort_values(), psidx.sort_values())
-        self.assert_eq(pidx.sort_values(ascending=False), 
psidx.sort_values(ascending=False))
+        self._test_sort_values(pidx, psidx)
 
     def test_index_drop_duplicates(self):
         pidx = pd.Index([1, 1, 2])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to