Repository: spark Updated Branches: refs/heads/master 5fa9f8795 -> 4f1dcd3dc
[SPARK-18051][SPARK CORE] fix bug of custom PartitionCoalescer causing serialization exception ## What changes were proposed in this pull request? add a require check in `CoalescedRDD` to make sure the passed in `partitionCoalescer` to be `serializable`. and update the document for api `RDD.coalesce` ## How was this patch tested? Manual.(test code in jira [SPARK-18051]) Author: WeichenXu <weichenxu...@outlook.com> Closes #15587 from WeichenXu123/fix_coalescer_bug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f1dcd3d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f1dcd3d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f1dcd3d Branch: refs/heads/master Commit: 4f1dcd3dce270268b42fbe59409790364fa5c5df Parents: 5fa9f87 Author: WeichenXu <weichenxu...@outlook.com> Authored: Sat Oct 22 11:59:28 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Sat Oct 22 11:59:28 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala | 4 ++++ core/src/main/scala/org/apache/spark/rdd/RDD.scala | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4f1dcd3d/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 9c198a6..2cba1fe 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -80,6 +80,10 @@ private[spark] class CoalescedRDD[T: ClassTag]( require(maxPartitions > 0 || maxPartitions == prev.partitions.length, s"Number of partitions ($maxPartitions) must be positive.") + if (partitionCoalescer.isDefined) { + require(partitionCoalescer.get.isInstanceOf[Serializable], + "The partition coalescer passed in must be serializable.") + } override def getPartitions: Array[Partition] = { val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer()) http://git-wip-us.apache.org/repos/asf/spark/blob/4f1dcd3d/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index be11957..db535de 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -432,7 +432,8 @@ abstract class RDD[T: ClassTag]( * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the - * data distributed using a hash partitioner. + * data distributed using a hash partitioner. The optional partition coalescer + * passed in must be serializable. */ def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org