Repository: spark
Updated Branches:
  refs/heads/master 6ce008ba4 -> 3de24ae2e


http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 9e08675..d3bfb00 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.spark.TaskContext;
 import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.serializer.SerializerManager;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering;
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering;
@@ -52,14 +53,16 @@ public final class UnsafeKVExternalSorter {
       StructType keySchema,
       StructType valueSchema,
       BlockManager blockManager,
+      SerializerManager serializerManager,
       long pageSizeBytes) throws IOException {
-    this(keySchema, valueSchema, blockManager, pageSizeBytes, null);
+    this(keySchema, valueSchema, blockManager, serializerManager, 
pageSizeBytes, null);
   }
 
   public UnsafeKVExternalSorter(
       StructType keySchema,
       StructType valueSchema,
       BlockManager blockManager,
+      SerializerManager serializerManager,
       long pageSizeBytes,
       @Nullable BytesToBytesMap map) throws IOException {
     this.keySchema = keySchema;
@@ -77,6 +80,7 @@ public final class UnsafeKVExternalSorter {
       sorter = UnsafeExternalSorter.create(
         taskMemoryManager,
         blockManager,
+        serializerManager,
         taskContext,
         recordComparator,
         prefixComparator,
@@ -116,6 +120,7 @@ public final class UnsafeKVExternalSorter {
       sorter = UnsafeExternalSorter.createWithExistingInMemorySorter(
         taskMemoryManager,
         blockManager,
+        serializerManager,
         taskContext,
         new KVComparator(ordering, keySchema.length()),
         prefixComparator,

http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index a4c0e1c..270c09a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -339,6 +339,7 @@ case class Window(
                 sorter = UnsafeExternalSorter.create(
                   TaskContext.get().taskMemoryManager(),
                   SparkEnv.get.blockManager,
+                  SparkEnv.get.serializerManager,
                   TaskContext.get(),
                   null,
                   null,

http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index c74ac8a..233ac26 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -399,6 +399,7 @@ private[sql] class DynamicPartitionWriterContainer(
       sortingKeySchema,
       StructType.fromAttributes(dataColumns),
       SparkEnv.get.blockManager,
+      SparkEnv.get.serializerManager,
       TaskContext.get().taskMemoryManager().pageSizeBytes)
 
     while (iterator.hasNext) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index fabd2fb..fb65b50 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -41,6 +41,7 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : 
RDD[UnsafeRow], numField
     val sorter = UnsafeExternalSorter.create(
       context.taskMemoryManager(),
       SparkEnv.get.blockManager,
+      SparkEnv.get.serializerManager,
       context,
       null,
       null,

http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index e03bd6a..476d93f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -120,7 +120,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite 
with SharedSQLContext {
       metricsSystem = null))
 
     val sorter = new UnsafeKVExternalSorter(
-      keySchema, valueSchema, SparkEnv.get.blockManager, pageSize)
+      keySchema, valueSchema, SparkEnv.get.blockManager, 
SparkEnv.get.serializerManager, pageSize)
 
     // Insert the keys and values into the sorter
     inputData.foreach { case (k, v) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index a29d55e..794fe26 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -279,6 +279,7 @@ private[spark] class 
SparkHiveDynamicPartitionWriterContainer(
         StructType.fromAttributes(partitionOutput),
         StructType.fromAttributes(dataOutput),
         SparkEnv.get.blockManager,
+        SparkEnv.get.serializerManager,
         TaskContext.get().taskMemoryManager().pageSizeBytes)
 
       while (iterator.hasNext) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index ace67a6..c56520b 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -115,6 +115,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
     assertValid()
     val hadoopConf = broadcastedHadoopConf.value
     val blockManager = SparkEnv.get.blockManager
+    val serializerManager = SparkEnv.get.serializerManager
     val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
     val blockId = partition.blockId
 
@@ -161,7 +162,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
         logDebug(s"Stored partition data of $this into block manager with 
level $storageLevel")
         dataRead.rewind()
       }
-      blockManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead))
+      serializerManager.dataDeserialize(blockId, new 
ChunkedByteBuffer(dataRead))
         .asInstanceOf[Iterator[T]]
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 6d4f4b9..85350ff 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.SerializerManager
 import org.apache.spark.storage._
 import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
 import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, 
WriteAheadLogUtils}
@@ -123,6 +124,7 @@ private[streaming] case class WriteAheadLogBasedStoreResult(
  */
 private[streaming] class WriteAheadLogBasedBlockHandler(
     blockManager: BlockManager,
+    serializerManager: SerializerManager,
     streamId: Int,
     storageLevel: StorageLevel,
     conf: SparkConf,
@@ -173,10 +175,10 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
     val serializedBlock = block match {
       case ArrayBufferBlock(arrayBuffer) =>
         numRecords = Some(arrayBuffer.size.toLong)
-        blockManager.dataSerialize(blockId, arrayBuffer.iterator)
+        serializerManager.dataSerialize(blockId, arrayBuffer.iterator)
       case IteratorBlock(iterator) =>
         val countIterator = new CountingIterator(iterator)
-        val serializedBlock = blockManager.dataSerialize(blockId, 
countIterator)
+        val serializedBlock = serializerManager.dataSerialize(blockId, 
countIterator)
         numRecords = countIterator.count
         serializedBlock
       case ByteBufferBlock(byteBuffer) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index e41fd11..4fb0f8c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -60,7 +60,7 @@ private[streaming] class ReceiverSupervisorImpl(
             "Please use streamingContext.checkpoint() to set the checkpoint 
directory. " +
             "See documentation for more details.")
       }
-      new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
+      new WriteAheadLogBasedBlockHandler(env.blockManager, 
env.serializerManager, receiver.streamId,
         receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
     } else {
       new BlockManagerBasedBlockHandler(env.blockManager, 
receiver.storageLevel)

http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 122ca06..4e77cd6 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -60,6 +60,7 @@ class ReceivedBlockHandlerSuite
   val mapOutputTracker = new MapOutputTrackerMaster(conf)
   val shuffleManager = new HashShuffleManager(conf)
   val serializer = new KryoSerializer(conf)
+  var serializerManager = new SerializerManager(serializer, conf)
   val manualClock = new ManualClock
   val blockManagerSize = 10000000
   val blockManagerBuffer = new ArrayBuffer[BlockManager]()
@@ -156,7 +157,7 @@ class ReceivedBlockHandlerSuite
           val reader = new 
FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
           val bytes = reader.read(fileSegment)
           reader.close()
-          blockManager.dataDeserialize(generateBlockId(), new 
ChunkedByteBuffer(bytes)).toList
+          serializerManager.dataDeserialize(generateBlockId(), new 
ChunkedByteBuffer(bytes)).toList
         }
         loggedData shouldEqual data
       }
@@ -265,7 +266,6 @@ class ReceivedBlockHandlerSuite
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, 
numCores = 1)
     val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 
1)
-    val serializerManager = new SerializerManager(serializer, conf)
     val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, 
serializerManager, conf,
       memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
     memManager.setMemoryStore(blockManager.memoryStore)
@@ -335,7 +335,8 @@ class ReceivedBlockHandlerSuite
       }
     }
 
-    def dataToByteBuffer(b: Seq[String]) = 
blockManager.dataSerialize(generateBlockId, b.iterator)
+    def dataToByteBuffer(b: Seq[String]) =
+      serializerManager.dataSerialize(generateBlockId, b.iterator)
 
     val blocks = data.grouped(10).toSeq
 
@@ -367,8 +368,8 @@ class ReceivedBlockHandlerSuite
   /** Instantiate a WriteAheadLogBasedBlockHandler and run a code with it */
   private def withWriteAheadLogBasedBlockHandler(body: 
WriteAheadLogBasedBlockHandler => Unit) {
     require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = false) 
=== 1)
-    val receivedBlockHandler = new 
WriteAheadLogBasedBlockHandler(blockManager, 1,
-      storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock)
+    val receivedBlockHandler = new 
WriteAheadLogBasedBlockHandler(blockManager, serializerManager,
+      1, storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock)
     try {
       body(receivedBlockHandler)
     } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index c4bf42d..ce5a6e0 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.spark.{SparkConf, SparkContext, SparkException, 
SparkFunSuite}
+import org.apache.spark.serializer.SerializerManager
 import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, 
StreamBlockId}
 import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, 
FileBasedWriteAheadLogWriter}
 import org.apache.spark.util.Utils
@@ -39,6 +40,7 @@ class WriteAheadLogBackedBlockRDDSuite
 
   var sparkContext: SparkContext = null
   var blockManager: BlockManager = null
+  var serializerManager: SerializerManager = null
   var dir: File = null
 
   override def beforeEach(): Unit = {
@@ -58,6 +60,7 @@ class WriteAheadLogBackedBlockRDDSuite
     super.beforeAll()
     sparkContext = new SparkContext(conf)
     blockManager = sparkContext.env.blockManager
+    serializerManager = sparkContext.env.serializerManager
   }
 
   override def afterAll(): Unit = {
@@ -65,6 +68,8 @@ class WriteAheadLogBackedBlockRDDSuite
     try {
       sparkContext.stop()
       System.clearProperty("spark.driver.port")
+      blockManager = null
+      serializerManager = null
     } finally {
       super.afterAll()
     }
@@ -107,8 +112,6 @@ class WriteAheadLogBackedBlockRDDSuite
    * It can also test if the partitions that were read from the log were again 
stored in
    * block manager.
    *
-   *
-   *
    * @param numPartitions Number of partitions in RDD
    * @param numPartitionsInBM Number of partitions to write to the 
BlockManager.
    *                          Partitions 0 to (numPartitionsInBM-1) will be 
written to BlockManager
@@ -223,7 +226,7 @@ class WriteAheadLogBackedBlockRDDSuite
     require(blockData.size === blockIds.size)
     val writer = new FileBasedWriteAheadLogWriter(new File(dir, 
"logFile").toString, hadoopConf)
     val segments = blockData.zip(blockIds).map { case (data, id) =>
-      writer.write(blockManager.dataSerialize(id, data.iterator).toByteBuffer)
+      writer.write(serializerManager.dataSerialize(id, 
data.iterator).toByteBuffer)
     }
     writer.close()
     segments


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to