This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 103ba7bf773 [SPARK-38793][PYTHON] Support `return_indexer` parameter of `Index/MultiIndex.sort_values` 103ba7bf773 is described below commit 103ba7bf773ce401eab8e31ae0a2fe67a30e57f9 Author: Xinrong Meng <xinrong.m...@databricks.com> AuthorDate: Wed Apr 13 12:17:35 2022 +0900 [SPARK-38793][PYTHON] Support `return_indexer` parameter of `Index/MultiIndex.sort_values` ### What changes were proposed in this pull request? Support `return_indexer` parameter of `Index/MultiIndex.sort_values`. Note that this method returns indexer as a pandas-on-Spark index while pandas returns it as a list. That's because indexer in pandas-on-Spark may not fit in memory. ### Why are the changes needed? To reach parity with pandas. ### Does this PR introduce _any_ user-facing change? Yes. `return_indexer` parameter of `Index/MultiIndex.sort_values` is supported as below. Index ```py >>> idx = ps.Index([10, 100, 1, 1000]) >>> idx Int64Index([10, 100, 1, 1000], dtype='int64') >>> idx.sort_values(ascending=False, return_indexer=True) (Int64Index([1000, 100, 10, 1], dtype='int64'), Int64Index([3, 1, 0, 2], dtype='int64')) ``` MultiIndex ```py >>> psidx = ps.MultiIndex.from_tuples([('a', 'x', 1), ('c', 'y', 2), ('b', 'z', 3)]) >>> psidx MultiIndex([('a', 'x', 1), ('c', 'y', 2), ('b', 'z', 3)], ) >>> psidx.sort_values(ascending=False, return_indexer=True) (MultiIndex([('c', 'y', 2), ('b', 'z', 3), ('a', 'x', 1)], ), Int64Index([1, 2, 0], dtype='int64')) ``` ### How was this patch tested? Unit test. Closes #36076 from xinrong-databricks/index.sort_values. Authored-by: Xinrong Meng <xinrong.m...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/pandas/indexes/base.py | 53 ++++++++++++++++++++---- python/pyspark/pandas/series.py | 6 +-- python/pyspark/pandas/tests/indexes/test_base.py | 31 +++++++++++--- 3 files changed, 73 insertions(+), 17 deletions(-) diff --git a/python/pyspark/pandas/indexes/base.py b/python/pyspark/pandas/indexes/base.py index d7119f032f3..da2bb5dc9c9 100644 --- a/python/pyspark/pandas/indexes/base.py +++ b/python/pyspark/pandas/indexes/base.py @@ -1543,17 +1543,24 @@ class Index(IndexOpsMixin): return result - # TODO: return_indexer - def sort_values(self, ascending: bool = True) -> "Index": + def sort_values( + self, return_indexer: bool = False, ascending: bool = True + ) -> Union["Index", Tuple["Index", "Index"]]: """ - Return a sorted copy of the index. + Return a sorted copy of the index, and optionally return the indices that + sorted the index itself. .. note:: This method is not supported for pandas when index has NaN value. pandas raises unexpected TypeError, but we support treating NaN as the smallest value. + This method returns indexer as a pandas-on-Spark index while + pandas returns it as a list. That's because indexer in pandas-on-Spark + may not fit in memory. Parameters ---------- + return_indexer : bool, default False + Should the indices that would sort the index be returned. ascending : bool, default True Should the index values be sorted in an ascending order. @@ -1561,6 +1568,8 @@ class Index(IndexOpsMixin): ------- sorted_index : ps.Index or ps.MultiIndex Sorted copy of the index. + indexer : ps.Index + The indices that the index itself was sorted by. See Also -------- @@ -1583,6 +1592,11 @@ class Index(IndexOpsMixin): >>> idx.sort_values(ascending=False) Int64Index([1000, 100, 10, 1], dtype='int64') + Sort values in descending order, and also get the indices idx was sorted by. + + >>> idx.sort_values(ascending=False, return_indexer=True) + (Int64Index([1000, 100, 10, 1], dtype='int64'), Int64Index([3, 1, 0, 2], dtype='int64')) + Support for MultiIndex. >>> psidx = ps.MultiIndex.from_tuples([('a', 'x', 1), ('c', 'y', 2), ('b', 'z', 3)]) @@ -1603,11 +1617,20 @@ class Index(IndexOpsMixin): ('b', 'z', 3), ('a', 'x', 1)], ) + + >>> psidx.sort_values(ascending=False, return_indexer=True) # doctest: +SKIP + (MultiIndex([('c', 'y', 2), + ('b', 'z', 3), + ('a', 'x', 1)], + ), Int64Index([1, 2, 0], dtype='int64')) """ sdf = self._internal.spark_frame - sdf = sdf.orderBy(*self._internal.index_spark_columns, ascending=ascending).select( - self._internal.index_spark_columns - ) + if return_indexer: + sequence_col = verify_temp_column_name(sdf, "__distributed_sequence_column__") + sdf = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col) + + ordered_sdf = sdf.orderBy(*self._internal.index_spark_columns, ascending=ascending) + sdf = ordered_sdf.select(self._internal.index_spark_columns) internal = InternalFrame( spark_frame=sdf, @@ -1617,7 +1640,21 @@ class Index(IndexOpsMixin): index_names=self._internal.index_names, index_fields=self._internal.index_fields, ) - return DataFrame(internal).index + sorted_index = DataFrame(internal).index + + if return_indexer: + alias_sequence_scol = scol_for(ordered_sdf, sequence_col).alias( + SPARK_DEFAULT_INDEX_NAME + ) + indexer_sdf = ordered_sdf.select(alias_sequence_scol) + indexer_internal = InternalFrame( + spark_frame=indexer_sdf, + index_spark_columns=[scol_for(indexer_sdf, SPARK_DEFAULT_INDEX_NAME)], + ) + indexer = DataFrame(indexer_internal).index + return sorted_index, indexer + else: + return sorted_index @no_type_check def sort(self, *args, **kwargs) -> None: @@ -2136,7 +2173,7 @@ class Index(IndexOpsMixin): elif isinstance(self, type(other)) and not isinstance(self, MultiIndex): if self.name == other.name: result.name = self.name - return result if sort is None else result.sort_values() + return result if sort is None else cast(Index, result.sort_values()) @property def is_all_dates(self) -> bool: diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index 9856b59947a..ced81b12e8c 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -5304,9 +5304,9 @@ class Series(Frame, IndexOpsMixin, Generic[T]): dtype: int64 """ if not same_anchor(self, other): - if get_option("compute.eager_check") and not self.index.sort_values().equals( - other.index.sort_values() - ): + if get_option("compute.eager_check") and not cast( + ps.Index, self.index.sort_values() + ).equals(cast(ps.Index, other.index.sort_values())): raise ValueError("matrices are not aligned") elif len(self.index) != len(other.index): raise ValueError("matrices are not aligned") diff --git a/python/pyspark/pandas/tests/indexes/test_base.py b/python/pyspark/pandas/tests/indexes/test_base.py index de138b58c68..5379e512825 100644 --- a/python/pyspark/pandas/tests/indexes/test_base.py +++ b/python/pyspark/pandas/tests/indexes/test_base.py @@ -802,18 +802,38 @@ class IndexesTest(ComparisonTestBase, TestUtils): psidx.names = ["lv", "lv"] self.assertRaises(ValueError, lambda: psidx.drop(["x", "y"], level="lv")) + def _test_sort_values(self, pidx, psidx): + self.assert_eq(pidx.sort_values(), psidx.sort_values()) + # Parameter ascending + self.assert_eq(pidx.sort_values(ascending=False), psidx.sort_values(ascending=False)) + # Parameter return_indexer + p_sorted, p_indexer = pidx.sort_values(return_indexer=True) + ps_sorted, ps_indexer = psidx.sort_values(return_indexer=True) + self.assert_eq(p_sorted, ps_sorted) + self.assert_eq(p_indexer, ps_indexer.to_list()) + self.assert_eq( + pidx.sort_values(return_indexer=False), psidx.sort_values(return_indexer=False) + ) + # Parameter return_indexer and ascending + p_sorted, p_indexer = pidx.sort_values(return_indexer=True, ascending=False) + ps_sorted, ps_indexer = psidx.sort_values(return_indexer=True, ascending=False) + self.assert_eq(p_sorted, ps_sorted) + self.assert_eq(p_indexer, ps_indexer.to_list()) + self.assert_eq( + pidx.sort_values(return_indexer=False, ascending=False), + psidx.sort_values(return_indexer=False, ascending=False), + ) + def test_sort_values(self): pidx = pd.Index([-10, -100, 200, 100]) psidx = ps.from_pandas(pidx) - self.assert_eq(pidx.sort_values(), psidx.sort_values()) - self.assert_eq(pidx.sort_values(ascending=False), psidx.sort_values(ascending=False)) + self._test_sort_values(pidx, psidx) pidx.name = "koalas" psidx.name = "koalas" - self.assert_eq(pidx.sort_values(), psidx.sort_values()) - self.assert_eq(pidx.sort_values(ascending=False), psidx.sort_values(ascending=False)) + self._test_sort_values(pidx, psidx) pidx = pd.MultiIndex.from_tuples([("a", "x", 1), ("b", "y", 2), ("c", "z", 3)]) psidx = ps.from_pandas(pidx) @@ -821,8 +841,7 @@ class IndexesTest(ComparisonTestBase, TestUtils): pidx.names = ["hello", "koalas", "goodbye"] psidx.names = ["hello", "koalas", "goodbye"] - self.assert_eq(pidx.sort_values(), psidx.sort_values()) - self.assert_eq(pidx.sort_values(ascending=False), psidx.sort_values(ascending=False)) + self._test_sort_values(pidx, psidx) def test_index_drop_duplicates(self): pidx = pd.Index([1, 1, 2]) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org