Repository: spark
Updated Branches:
  refs/heads/branch-1.3 f81611dca -> 9846790f4


[SPARK-6087][CORE] Provide actionable exception if Kryo buffer is not large 
enough

A simple try-catch wrapping KryoException to be more informative.

Author: Lev Khomich <levkhom...@gmail.com>

Closes #4947 from levkhomich/master and squashes the following commits:

0f7a947 [Lev Khomich] [SPARK-6087][CORE] Provide actionable exception if Kryo 
buffer is not large enough


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9846790f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9846790f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9846790f

Branch: refs/heads/branch-1.3
Commit: 9846790f49e2716e0b0c15f58e8547a1f04ba3ae
Parents: f81611d
Author: Lev Khomich <levkhom...@gmail.com>
Authored: Tue Mar 10 10:55:42 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Mar 13 18:17:02 2015 +0000

----------------------------------------------------------------------
 .../org/apache/spark/serializer/KryoSerializer.scala  |  8 +++++++-
 .../apache/spark/serializer/KryoSerializerSuite.scala | 14 ++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9846790f/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 9ce64d4..dc7aa99 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -158,7 +158,13 @@ private[spark] class KryoSerializerInstance(ks: 
KryoSerializer) extends Serializ
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
     output.clear()
-    kryo.writeClassAndObject(output, t)
+    try {
+      kryo.writeClassAndObject(output, t)
+    } catch {
+      case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
+        throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. 
To avoid this, " +
+          "increase spark.kryoserializer.buffer.max.mb value.")
+    }
     ByteBuffer.wrap(output.toBytes)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9846790f/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 523d898..6198df8 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -261,6 +261,20 @@ class KryoSerializerSuite extends FunSuite with 
SharedSparkContext {
       ser.serialize(HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 
1234), blockSizes))
     }
   }
+
+  test("serialization buffer overflow reporting") {
+    import org.apache.spark.SparkException
+    val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max.mb"
+
+    val largeObject = (1 to 1000000).toArray
+
+    val conf = new SparkConf(false)
+    conf.set(kryoBufferMaxProperty, "1")
+
+    val ser = new KryoSerializer(conf).newInstance()
+    val thrown = intercept[SparkException](ser.serialize(largeObject))
+    assert(thrown.getMessage.contains(kryoBufferMaxProperty))
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to