spark git commit: [SPARK-16550][SPARK-17042][CORE] Certain classes fail to deserialize in block manager replication

2016-08-22 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 71afeeea4 -> 8e223ea67


[SPARK-16550][SPARK-17042][CORE] Certain classes fail to deserialize in block 
manager replication

## What changes were proposed in this pull request?

This is a straightforward clone of JoshRosen 's original patch. I have 
follow-up changes to fix block replication for repl-defined classes as well, 
but those appear to be flaking tests so I'm going to leave that for SPARK-17042

## How was this patch tested?

End-to-end test in ReplSuite (also more tests in DistributedSuite from the 
original patch).

Author: Eric Liang 

Closes #14311 from ericl/spark-16550.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e223ea6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e223ea6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e223ea6

Branch: refs/heads/master
Commit: 8e223ea67acf5aa730ccf688802f17f6fc10907c
Parents: 71afeee
Author: Eric Liang 
Authored: Mon Aug 22 16:32:14 2016 -0700
Committer: Reynold Xin 
Committed: Mon Aug 22 16:32:14 2016 -0700

--
 .../spark/serializer/SerializerManager.scala| 14 +++-
 .../org/apache/spark/storage/BlockManager.scala | 13 +++-
 .../org/apache/spark/DistributedSuite.scala | 77 ++--
 .../scala/org/apache/spark/repl/ReplSuite.scala | 14 
 4 files changed, 60 insertions(+), 58 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8e223ea6/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala 
b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 9dc274c..07caadb 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -68,7 +68,7 @@ private[spark] class SerializerManager(defaultSerializer: 
Serializer, conf: Spar
* loaded yet. */
   private lazy val compressionCodec: CompressionCodec = 
CompressionCodec.createCodec(conf)
 
-  private def canUseKryo(ct: ClassTag[_]): Boolean = {
+  def canUseKryo(ct: ClassTag[_]): Boolean = {
 primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
   }
 
@@ -128,8 +128,18 @@ private[spark] class SerializerManager(defaultSerializer: 
Serializer, conf: Spar
 
   /** Serializes into a chunked byte buffer. */
   def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): 
ChunkedByteBuffer = {
+dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]])
+  }
+
+  /** Serializes into a chunked byte buffer. */
+  def dataSerializeWithExplicitClassTag(
+  blockId: BlockId,
+  values: Iterator[_],
+  classTag: ClassTag[_]): ChunkedByteBuffer = {
 val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, 
ByteBuffer.allocate)
-dataSerializeStream(blockId, bbos, values)
+val byteStream = new BufferedOutputStream(bbos)
+val ser = getSerializer(classTag).newInstance()
+ser.serializeStream(wrapForCompression(blockId, 
byteStream)).writeAll(values).close()
 bbos.toChunkedByteBuffer
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8e223ea6/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 015e71d..fe84652 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -498,7 +498,8 @@ private[spark] class BlockManager(
 diskStore.getBytes(blockId)
   } else if (level.useMemory && memoryStore.contains(blockId)) {
 // The block was not found on disk, so serialize an in-memory copy:
-serializerManager.dataSerialize(blockId, 
memoryStore.getValues(blockId).get)
+serializerManager.dataSerializeWithExplicitClassTag(
+  blockId, memoryStore.getValues(blockId).get, info.classTag)
   } else {
 handleLocalReadFailure(blockId)
   }
@@ -973,8 +974,16 @@ private[spark] class BlockManager(
 if (level.replication > 1) {
   val remoteStartTime = System.currentTimeMillis
   val bytesToReplicate = doGetLocalBytes(blockId, info)
+  // [SPARK-16550] Erase the typed classTag when using default 
serialization, since
+  // NettyBlockRpcServer crashes when deserializing repl-defined 
classes.
+  // TODO(ekl) remove this once the classloader issue on the remote 
end is fixed.
+  val remoteClassTag = if (!s

spark git commit: [SPARK-16550][SPARK-17042][CORE] Certain classes fail to deserialize in block manager replication

2016-08-22 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 b65b041af -> ff2f87380


[SPARK-16550][SPARK-17042][CORE] Certain classes fail to deserialize in block 
manager replication

## What changes were proposed in this pull request?

This is a straightforward clone of JoshRosen 's original patch. I have 
follow-up changes to fix block replication for repl-defined classes as well, 
but those appear to be flaking tests so I'm going to leave that for SPARK-17042

## How was this patch tested?

End-to-end test in ReplSuite (also more tests in DistributedSuite from the 
original patch).

Author: Eric Liang 

Closes #14311 from ericl/spark-16550.

(cherry picked from commit 8e223ea67acf5aa730ccf688802f17f6fc10907c)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff2f8738
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff2f8738
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff2f8738

Branch: refs/heads/branch-2.0
Commit: ff2f873800fcc3d699e52e60fd0e69eb01d12503
Parents: b65b041
Author: Eric Liang 
Authored: Mon Aug 22 16:32:14 2016 -0700
Committer: Reynold Xin 
Committed: Mon Aug 22 16:32:22 2016 -0700

--
 .../spark/serializer/SerializerManager.scala| 14 +++-
 .../org/apache/spark/storage/BlockManager.scala | 13 +++-
 .../org/apache/spark/DistributedSuite.scala | 77 ++--
 .../scala/org/apache/spark/repl/ReplSuite.scala | 14 
 4 files changed, 60 insertions(+), 58 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ff2f8738/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala 
b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 9dc274c..07caadb 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -68,7 +68,7 @@ private[spark] class SerializerManager(defaultSerializer: 
Serializer, conf: Spar
* loaded yet. */
   private lazy val compressionCodec: CompressionCodec = 
CompressionCodec.createCodec(conf)
 
-  private def canUseKryo(ct: ClassTag[_]): Boolean = {
+  def canUseKryo(ct: ClassTag[_]): Boolean = {
 primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
   }
 
@@ -128,8 +128,18 @@ private[spark] class SerializerManager(defaultSerializer: 
Serializer, conf: Spar
 
   /** Serializes into a chunked byte buffer. */
   def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): 
ChunkedByteBuffer = {
+dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]])
+  }
+
+  /** Serializes into a chunked byte buffer. */
+  def dataSerializeWithExplicitClassTag(
+  blockId: BlockId,
+  values: Iterator[_],
+  classTag: ClassTag[_]): ChunkedByteBuffer = {
 val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, 
ByteBuffer.allocate)
-dataSerializeStream(blockId, bbos, values)
+val byteStream = new BufferedOutputStream(bbos)
+val ser = getSerializer(classTag).newInstance()
+ser.serializeStream(wrapForCompression(blockId, 
byteStream)).writeAll(values).close()
 bbos.toChunkedByteBuffer
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ff2f8738/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 83a9cbd..a89ce85 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -497,7 +497,8 @@ private[spark] class BlockManager(
 diskStore.getBytes(blockId)
   } else if (level.useMemory && memoryStore.contains(blockId)) {
 // The block was not found on disk, so serialize an in-memory copy:
-serializerManager.dataSerialize(blockId, 
memoryStore.getValues(blockId).get)
+serializerManager.dataSerializeWithExplicitClassTag(
+  blockId, memoryStore.getValues(blockId).get, info.classTag)
   } else {
 handleLocalReadFailure(blockId)
   }
@@ -972,8 +973,16 @@ private[spark] class BlockManager(
 if (level.replication > 1) {
   val remoteStartTime = System.currentTimeMillis
   val bytesToReplicate = doGetLocalBytes(blockId, info)
+  // [SPARK-16550] Erase the typed classTag when using default 
serialization, since
+  // NettyBlockRpcServer crashes when deserializing repl-defined 
classes.
+  // TODO(ekl)