Repository: spark
Updated Branches:
  refs/heads/branch-1.2 7a1583917 -> daec1c635


[SPARK-5969][PySpark] Fix descending pyspark.rdd.sortByKey.

The samples should always be sorted in ascending order, because 
bisect.bisect_left is used on it. The reverse order of the result is already 
achieved in rangePartitioner by reversing the found index.

The current implementation also work, but always uses only two partitions -- 
the first one and the last one (because the bisect_left return returns either 
"beginning" or "end" for a descending sequence).

Author: Milan Straka <f...@ucw.cz>

This patch had conflicts when merged, resolved by
Committer: Josh Rosen <joshro...@databricks.com>

Closes #4761 from foxik/fix-descending-sort and squashes the following commits:

95896b5 [Milan Straka] Add regression test for SPARK-5969.
5757490 [Milan Straka] Fix descending pyspark.rdd.sortByKey.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/daec1c63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/daec1c63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/daec1c63

Branch: refs/heads/branch-1.2
Commit: daec1c6353e5e4daac2f082f714e45a95939a538
Parents: 7a15839
Author: Milan Straka <f...@ucw.cz>
Authored: Fri Apr 10 13:50:32 2015 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Fri Apr 10 15:21:50 2015 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py   |  2 +-
 python/pyspark/tests.py | 11 +++++++++++
 2 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/daec1c63/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 5f7806b..9463519 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -587,7 +587,7 @@ class RDD(object):
         maxSampleSize = numPartitions * 20.0  # constant from Spark's 
RangePartitioner
         fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
         samples = self.sample(False, fraction, 1).map(lambda (k, v): 
k).collect()
-        samples = sorted(samples, reverse=(not ascending), key=keyfunc)
+        samples = sorted(samples, key=keyfunc)
 
         # we have numPartitions many parts but one of the them has
         # an implicit boundary

http://git-wip-us.apache.org/repos/asf/spark/blob/daec1c63/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 2e490a0..7cb4645 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -744,6 +744,17 @@ class RDDTests(ReusedPySparkTestCase):
         rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
         rdd._jrdd.first()
 
+    def test_sortByKey_uses_all_partitions_not_only_first_and_last(self):
+        # Regression test for SPARK-5969
+        seq = [(i * 59 % 101, i) for i in range(101)]  # unsorted sequence
+        rdd = self.sc.parallelize(seq)
+        for ascending in [True, False]:
+            sort = rdd.sortByKey(ascending=ascending, numPartitions=5)
+            self.assertEqual(sort.collect(), sorted(seq, reverse=not 
ascending))
+            sizes = sort.glom().map(len).collect()
+            for size in sizes:
+                self.assertGreater(size, 0)
+
 
 class ProfilerTests(PySparkTestCase):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to