Repository: spark Updated Branches: refs/heads/master cd106b050 -> 1e35e9693
[SPARK-17817] [PYSPARK] [FOLLOWUP] PySpark RDD Repartitioning Results in Highly Skewed Partition Sizes ## What changes were proposed in this pull request? This change is a followup for #15389 which calls `_to_java_object_rdd()` to solve this issue. Due to the concern of the possible expensive cost of the call, we can choose to decrease the batch size to solve this issue too. Simple benchmark: import time num_partitions = 20000 a = sc.parallelize(range(int(1e6)), 2) start = time.time() l = a.repartition(num_partitions).glom().map(len).collect() end = time.time() print(end - start) Before: 419.447577953 _to_java_object_rdd(): 421.916361094 decreasing the batch size: 423.712255955 ## How was this patch tested? Jenkins tests. Author: Liang-Chi Hsieh <vii...@gmail.com> Closes #15445 from viirya/repartition-batch-size. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e35e969 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e35e969 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e35e969 Branch: refs/heads/master Commit: 1e35e969305555dda02cb0788c8143e5f2e1944b Parents: cd106b0 Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Tue Oct 18 14:25:10 2016 -0700 Committer: Davies Liu <davies....@gmail.com> Committed: Tue Oct 18 14:25:10 2016 -0700 ---------------------------------------------------------------------- python/pyspark/rdd.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1e35e969/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 0e2ae19..2de2c2f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2029,12 +2029,12 @@ class RDD(object): [[1, 2, 3, 4, 5]] """ if shuffle: - # In Scala's repartition code, we will distribute elements evenly across output - # partitions. However, the RDD from Python is serialized as a single binary data, - # so the distribution fails and produces highly skewed partitions. We need to - # convert it to a RDD of java object before repartitioning. - data_java_rdd = self._to_java_object_rdd().coalesce(numPartitions, shuffle) - jrdd = self.ctx._jvm.SerDeUtil.javaToPython(data_java_rdd) + # Decrease the batch size in order to distribute evenly the elements across output + # partitions. Otherwise, repartition will possibly produce highly skewed partitions. + batchSize = min(10, self.ctx._batchSize or 1024) + ser = BatchedSerializer(PickleSerializer(), batchSize) + selfCopy = self._reserialize(ser) + jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle) else: jrdd = self._jrdd.coalesce(numPartitions, shuffle) return RDD(jrdd, self.ctx, self._jrdd_deserializer) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org