Repository: spark Updated Branches: refs/heads/master b9baa4cd9 -> 0375134f4
[SPARK-5969][PySpark] Fix descending pyspark.rdd.sortByKey. The samples should always be sorted in ascending order, because bisect.bisect_left is used on it. The reverse order of the result is already achieved in rangePartitioner by reversing the found index. The current implementation also work, but always uses only two partitions -- the first one and the last one (because the bisect_left return returns either "beginning" or "end" for a descending sequence). Author: Milan Straka <f...@ucw.cz> This patch had conflicts when merged, resolved by Committer: Josh Rosen <joshro...@databricks.com> Closes #4761 from foxik/fix-descending-sort and squashes the following commits: 95896b5 [Milan Straka] Add regression test for SPARK-5969. 5757490 [Milan Straka] Fix descending pyspark.rdd.sortByKey. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0375134f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0375134f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0375134f Branch: refs/heads/master Commit: 0375134f42197f2e29aa865a513cda381f0a1445 Parents: b9baa4c Author: Milan Straka <f...@ucw.cz> Authored: Fri Apr 10 13:50:32 2015 -0700 Committer: Josh Rosen <joshro...@databricks.com> Committed: Fri Apr 10 13:50:32 2015 -0700 ---------------------------------------------------------------------- python/pyspark/rdd.py | 2 +- python/pyspark/tests.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0375134f/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1b18789..c8e54ed 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -595,7 +595,7 @@ class RDD(object): maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner fraction = min(maxSampleSize / max(rddSize, 1), 1.0) samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() - samples = sorted(samples, reverse=(not ascending), key=keyfunc) + samples = sorted(samples, key=keyfunc) # we have numPartitions many parts but one of the them has # an implicit boundary http://git-wip-us.apache.org/repos/asf/spark/blob/0375134f/python/pyspark/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 0bd5d20..0e3721b 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -829,6 +829,17 @@ class RDDTests(ReusedPySparkTestCase): rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x)) rdd._jrdd.first() + def test_sortByKey_uses_all_partitions_not_only_first_and_last(self): + # Regression test for SPARK-5969 + seq = [(i * 59 % 101, i) for i in range(101)] # unsorted sequence + rdd = self.sc.parallelize(seq) + for ascending in [True, False]: + sort = rdd.sortByKey(ascending=ascending, numPartitions=5) + self.assertEqual(sort.collect(), sorted(seq, reverse=not ascending)) + sizes = sort.glom().map(len).collect() + for size in sizes: + self.assertGreater(size, 0) + class ProfilerTests(PySparkTestCase): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org