Repository: spark
Updated Branches:
  refs/heads/branch-1.5 63922fa4d -> 71aa54755


[SPARK-10128] [STREAMING] Used correct classloader to deserialize WAL data

Recovering Kinesis sequence numbers from WAL leads to classnotfoundexception 
because the ObjectInputStream does not use the correct classloader and the 
SequenceNumberRanges class (in streaming-kinesis-asl package) cannot be found 
(added through spark-submit) while deserializing. The solution is to use 
`Thread.currentThread().getContextClassLoader` while deserializing.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #8328 from tdas/SPARK-10128 and squashes the following commits:

f19b1c2 [Tathagata Das] Used correct classloader to deserialize WAL data

(cherry picked from commit b762f9920f7587d3c08493c49dd2fede62110b88)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71aa5475
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71aa5475
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71aa5475

Branch: refs/heads/branch-1.5
Commit: 71aa5475597f4220e2bab6b42caf9b98f248ac99
Parents: 63922fa
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Wed Aug 19 21:15:58 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Aug 19 21:16:17 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/streaming/scheduler/ReceivedBlockTracker.scala | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/71aa5475/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 7720259..53b96d5 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.streaming.Time
 import org.apache.spark.streaming.util.{WriteAheadLog, WriteAheadLogUtils}
 import org.apache.spark.util.{Clock, Utils}
-import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.{Logging, SparkConf}
 
 /** Trait representing any event in the ReceivedBlockTracker that updates its 
state. */
 private[streaming] sealed trait ReceivedBlockTrackerLogEvent
@@ -199,7 +199,8 @@ private[streaming] class ReceivedBlockTracker(
       import scala.collection.JavaConversions._
       writeAheadLog.readAll().foreach { byteBuffer =>
         logTrace("Recovering record " + byteBuffer)
-        Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) 
match {
+        Utils.deserialize[ReceivedBlockTrackerLogEvent](
+          byteBuffer.array, Thread.currentThread().getContextClassLoader) 
match {
           case BlockAdditionEvent(receivedBlockInfo) =>
             insertAddedBlock(receivedBlockInfo)
           case BatchAllocationEvent(time, allocatedBlocks) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to