Repository: spark
Updated Branches:
  refs/heads/master 3c1f1baaf -> 852f4de2d


[SPARK-7873] Allow KryoSerializerInstance to create multiple streams at the 
same time

This is a somewhat obscure bug, but I think that it will seriously impact 
KryoSerializer users who use custom registrators which disabled auto-reset. 
When auto-reset is disabled, then this breaks things in some of our shuffle 
paths which actually end up creating multiple OutputStreams from the same 
shared SerializerInstance (which is unsafe).

This was introduced by a patch (SPARK-3386) which enables serializer re-use in 
some of the shuffle paths, since constructing new serializer instances is 
actually pretty costly for KryoSerializer.  We had already fixed another 
corner-case (SPARK-7766) bug related to this, but missed this one.

I think that the root problem here is that KryoSerializerInstance can be used 
in a way which is unsafe even within a single thread, e.g. by creating multiple 
open OutputStreams from the same instance or by interleaving deserialize and 
deserializeStream calls. I considered a smaller patch which adds assertions to 
guard against this type of "misuse" but abandoned that approach after I 
realized how convoluted the Scaladoc became.

This patch fixes this bug by making it legal to create multiple streams from 
the same KryoSerializerInstance.  Internally, KryoSerializerInstance now 
implements a  `borrowKryo()` / `releaseKryo()` API that's backed by a "pool" of 
capacity 1. Each call to a KryoSerializerInstance method will borrow the Kryo, 
do its work, then release the serializer instance back to the pool. If the pool 
is empty and we need an instance, it will allocate a new Kryo on-demand. This 
makes it safe for multiple OutputStreams to be opened from the same serializer. 
If we try to release a Kryo back to the pool but the pool already contains a 
Kryo, then we'll just discard the new Kryo. I don't think there's a clear 
benefit to having a larger pool since our usages tend to fall into two cases, 
a) where we only create a single OutputStream and b) where we create a huge 
number of OutputStreams with the same lifecycle, then destroy the 
KryoSerializerInstance (this is what's happening in the bypassMergeSort code pat
 h that my regression test hits).

Author: Josh Rosen <joshro...@databricks.com>

Closes #6415 from JoshRosen/SPARK-7873 and squashes the following commits:

00b402e [Josh Rosen] Initialize eagerly to fix a failing test
ba55d20 [Josh Rosen] Add explanatory comments
3f1da96 [Josh Rosen] Guard against duplicate close()
ab457ca [Josh Rosen] Sketch a loan/release based solution.
9816e8f [Josh Rosen] Add a failing test showing how deserialize() and 
deserializeStream() can interfere.
7350886 [Josh Rosen] Add failing regression test for SPARK-7873


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/852f4de2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/852f4de2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/852f4de2

Branch: refs/heads/master
Commit: 852f4de2d3d0c5fff2fa66000a7a3088bb3dbe74
Parents: 3c1f1ba
Author: Josh Rosen <joshro...@databricks.com>
Authored: Wed May 27 20:19:53 2015 -0700
Committer: Patrick Wendell <patr...@databricks.com>
Committed: Wed May 27 20:19:53 2015 -0700

----------------------------------------------------------------------
 .../spark/serializer/KryoSerializer.scala       | 129 +++++++++++++++----
 .../apache/spark/serializer/Serializer.scala    |   5 +
 .../spark/serializer/KryoSerializerSuite.scala  |  37 +++++-
 3 files changed, 147 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/852f4de2/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 2179579..3f90988 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.serializer
 
-import java.io.{EOFException, InputStream, OutputStream}
+import java.io.{EOFException, IOException, InputStream, OutputStream}
 import java.nio.ByteBuffer
+import javax.annotation.Nullable
 
 import scala.reflect.ClassTag
 
@@ -136,21 +137,45 @@ class KryoSerializer(conf: SparkConf)
 }
 
 private[spark]
-class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends 
SerializationStream {
-  val output = new KryoOutput(outStream)
+class KryoSerializationStream(
+    serInstance: KryoSerializerInstance,
+    outStream: OutputStream) extends SerializationStream {
+
+  private[this] var output: KryoOutput = new KryoOutput(outStream)
+  private[this] var kryo: Kryo = serInstance.borrowKryo()
 
   override def writeObject[T: ClassTag](t: T): SerializationStream = {
     kryo.writeClassAndObject(output, t)
     this
   }
 
-  override def flush() { output.flush() }
-  override def close() { output.close() }
+  override def flush() {
+    if (output == null) {
+      throw new IOException("Stream is closed")
+    }
+    output.flush()
+  }
+
+  override def close() {
+    if (output != null) {
+      try {
+        output.close()
+      } finally {
+        serInstance.releaseKryo(kryo)
+        kryo = null
+        output = null
+      }
+    }
+  }
 }
 
 private[spark]
-class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends 
DeserializationStream {
-  private val input = new KryoInput(inStream)
+class KryoDeserializationStream(
+    serInstance: KryoSerializerInstance,
+    inStream: InputStream) extends DeserializationStream {
+
+  private[this] var input: KryoInput = new KryoInput(inStream)
+  private[this] var kryo: Kryo = serInstance.borrowKryo()
 
   override def readObject[T: ClassTag](): T = {
     try {
@@ -163,52 +188,105 @@ class KryoDeserializationStream(kryo: Kryo, inStream: 
InputStream) extends Deser
   }
 
   override def close() {
-    // Kryo's Input automatically closes the input stream it is using.
-    input.close()
+    if (input != null) {
+      try {
+        // Kryo's Input automatically closes the input stream it is using.
+        input.close()
+      } finally {
+        serInstance.releaseKryo(kryo)
+        kryo = null
+        input = null
+      }
+    }
   }
 }
 
 private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends 
SerializerInstance {
-  private val kryo = ks.newKryo()
 
-  // Make these lazy vals to avoid creating a buffer unless we use them
+  /**
+   * A re-used [[Kryo]] instance. Methods will borrow this instance by calling 
`borrowKryo()`, do
+   * their work, then release the instance by calling `releaseKryo()`. 
Logically, this is a caching
+   * pool of size one. SerializerInstances are not thread-safe, hence accesses 
to this field are
+   * not synchronized.
+   */
+  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
+
+  /**
+   * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached 
Kryo instance;
+   * otherwise, it allocates a new instance.
+   */
+  private[serializer] def borrowKryo(): Kryo = {
+    if (cachedKryo != null) {
+      val kryo = cachedKryo
+      // As a defensive measure, call reset() to clear any Kryo state that 
might have been modified
+      // by the last operation to borrow this instance (see SPARK-7766 for 
discussion of this issue)
+      kryo.reset()
+      cachedKryo = null
+      kryo
+    } else {
+      ks.newKryo()
+    }
+  }
+
+  /**
+   * Release a borrowed [[Kryo]] instance. If this serializer instance already 
has a cached Kryo
+   * instance, then the given Kryo instance is discarded; otherwise, the Kryo 
is stored for later
+   * re-use.
+   */
+  private[serializer] def releaseKryo(kryo: Kryo): Unit = {
+    if (cachedKryo == null) {
+      cachedKryo = kryo
+    }
+  }
+
+  // Make these lazy vals to avoid creating a buffer unless we use them.
   private lazy val output = ks.newKryoOutput()
   private lazy val input = new KryoInput()
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
     output.clear()
-    kryo.reset() // We must reset in case this serializer instance was reused 
(see SPARK-7766)
+    val kryo = borrowKryo()
     try {
       kryo.writeClassAndObject(output, t)
     } catch {
       case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
         throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. 
To avoid this, " +
           "increase spark.kryoserializer.buffer.max value.")
+    } finally {
+      releaseKryo(kryo)
     }
     ByteBuffer.wrap(output.toBytes)
   }
 
   override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
-    input.setBuffer(bytes.array)
-    kryo.readClassAndObject(input).asInstanceOf[T]
+    val kryo = borrowKryo()
+    try {
+      input.setBuffer(bytes.array)
+      kryo.readClassAndObject(input).asInstanceOf[T]
+    } finally {
+      releaseKryo(kryo)
+    }
   }
 
   override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: 
ClassLoader): T = {
+    val kryo = borrowKryo()
     val oldClassLoader = kryo.getClassLoader
-    kryo.setClassLoader(loader)
-    input.setBuffer(bytes.array)
-    val obj = kryo.readClassAndObject(input).asInstanceOf[T]
-    kryo.setClassLoader(oldClassLoader)
-    obj
+    try {
+      kryo.setClassLoader(loader)
+      input.setBuffer(bytes.array)
+      kryo.readClassAndObject(input).asInstanceOf[T]
+    } finally {
+      kryo.setClassLoader(oldClassLoader)
+      releaseKryo(kryo)
+    }
   }
 
   override def serializeStream(s: OutputStream): SerializationStream = {
-    kryo.reset() // We must reset in case this serializer instance was reused 
(see SPARK-7766)
-    new KryoSerializationStream(kryo, s)
+    new KryoSerializationStream(this, s)
   }
 
   override def deserializeStream(s: InputStream): DeserializationStream = {
-    new KryoDeserializationStream(kryo, s)
+    new KryoDeserializationStream(this, s)
   }
 
   /**
@@ -218,7 +296,12 @@ private[spark] class KryoSerializerInstance(ks: 
KryoSerializer) extends Serializ
   def getAutoReset(): Boolean = {
     val field = classOf[Kryo].getDeclaredField("autoReset")
     field.setAccessible(true)
-    field.get(kryo).asInstanceOf[Boolean]
+    val kryo = borrowKryo()
+    try {
+      field.get(kryo).asInstanceOf[Boolean]
+    } finally {
+      releaseKryo(kryo)
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/852f4de2/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
index 6078c9d..f1bdff9 100644
--- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -19,6 +19,7 @@ package org.apache.spark.serializer
 
 import java.io._
 import java.nio.ByteBuffer
+import javax.annotation.concurrent.NotThreadSafe
 
 import scala.reflect.ClassTag
 
@@ -114,8 +115,12 @@ object Serializer {
 /**
  * :: DeveloperApi ::
  * An instance of a serializer, for use by one thread at a time.
+ *
+ * It is legal to create multiple serialization / deserialization streams from 
the same
+ * SerializerInstance as long as those streams are all used within the same 
thread.
  */
 @DeveloperApi
+@NotThreadSafe
 abstract class SerializerInstance {
   def serialize[T: ClassTag](t: T): ByteBuffer
 

http://git-wip-us.apache.org/repos/asf/spark/blob/852f4de2/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 8c384bd..ef50bc9 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.serializer
 
-import java.io.ByteArrayOutputStream
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
 
 import scala.collection.mutable
 import scala.reflect.ClassTag
@@ -361,6 +361,41 @@ class KryoSerializerSuite extends FunSuite with 
SharedSparkContext {
   }
 }
 
+class KryoSerializerAutoResetDisabledSuite extends FunSuite with 
SharedSparkContext {
+  conf.set("spark.serializer", classOf[KryoSerializer].getName)
+  conf.set("spark.kryo.registrator", 
classOf[RegistratorWithoutAutoReset].getName)
+  conf.set("spark.kryo.referenceTracking", "true")
+  conf.set("spark.shuffle.manager", "sort")
+  conf.set("spark.shuffle.sort.bypassMergeThreshold", "200")
+
+  test("sort-shuffle with bypassMergeSort (SPARK-7873)") {
+    val myObject = ("Hello", "World")
+    
assert(sc.parallelize(Seq.fill(100)(myObject)).repartition(2).collect().toSet 
=== Set(myObject))
+  }
+
+  test("calling deserialize() after deserializeStream()") {
+    val serInstance = new 
KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
+    assert(!serInstance.getAutoReset())
+    val hello = "Hello"
+    val world = "World"
+    // Here, we serialize the same value twice, so the reference-tracking 
should cause us to store
+    // references to some of these values
+    val helloHello = serInstance.serialize((hello, hello))
+    // Here's a stream which only contains one value
+    val worldWorld: Array[Byte] = {
+      val baos = new ByteArrayOutputStream()
+      val serStream = serInstance.serializeStream(baos)
+      serStream.writeObject(world)
+      serStream.writeObject(world)
+      serStream.close()
+      baos.toByteArray
+    }
+    val deserializationStream = serInstance.deserializeStream(new 
ByteArrayInputStream(worldWorld))
+    assert(deserializationStream.readValue[Any]() === world)
+    deserializationStream.close()
+    assert(serInstance.deserialize[Any](helloHello) === (hello, hello))
+  }
+}
 
 class ClassLoaderTestingObject
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to