Repository: spark
Updated Branches:
  refs/heads/master 5fa9f8795 -> 4f1dcd3dc


[SPARK-18051][SPARK CORE] fix bug of custom PartitionCoalescer causing 
serialization exception

## What changes were proposed in this pull request?

add a require check in `CoalescedRDD` to make sure the passed in 
`partitionCoalescer` to be `serializable`.
and update the document for api `RDD.coalesce`

## How was this patch tested?

Manual.(test code in jira [SPARK-18051])

Author: WeichenXu <weichenxu...@outlook.com>

Closes #15587 from WeichenXu123/fix_coalescer_bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f1dcd3d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f1dcd3d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f1dcd3d

Branch: refs/heads/master
Commit: 4f1dcd3dce270268b42fbe59409790364fa5c5df
Parents: 5fa9f87
Author: WeichenXu <weichenxu...@outlook.com>
Authored: Sat Oct 22 11:59:28 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Sat Oct 22 11:59:28 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala | 4 ++++
 core/src/main/scala/org/apache/spark/rdd/RDD.scala          | 3 ++-
 2 files changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4f1dcd3d/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 9c198a6..2cba1fe 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -80,6 +80,10 @@ private[spark] class CoalescedRDD[T: ClassTag](
 
   require(maxPartitions > 0 || maxPartitions == prev.partitions.length,
     s"Number of partitions ($maxPartitions) must be positive.")
+  if (partitionCoalescer.isDefined) {
+    require(partitionCoalescer.get.isInstanceOf[Serializable],
+      "The partition coalescer passed in must be serializable.")
+  }
 
   override def getPartitions: Array[Partition] = {
     val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())

http://git-wip-us.apache.org/repos/asf/spark/blob/4f1dcd3d/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index be11957..db535de 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -432,7 +432,8 @@ abstract class RDD[T: ClassTag](
    * of partitions. This is useful if you have a small number of partitions,
    * say 100, potentially with a few partitions being abnormally large. Calling
    * coalesce(1000, shuffle = true) will result in 1000 partitions with the
-   * data distributed using a hash partitioner.
+   * data distributed using a hash partitioner. The optional partition 
coalescer
+   * passed in must be serializable.
    */
   def coalesce(numPartitions: Int, shuffle: Boolean = false,
                partitionCoalescer: Option[PartitionCoalescer] = Option.empty)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to