Repository: spark Updated Branches: refs/heads/master 6ce008ba4 -> 3de24ae2e
http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 9e08675..d3bfb00 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.spark.TaskContext; import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.serializer.SerializerManager; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering; import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering; @@ -52,14 +53,16 @@ public final class UnsafeKVExternalSorter { StructType keySchema, StructType valueSchema, BlockManager blockManager, + SerializerManager serializerManager, long pageSizeBytes) throws IOException { - this(keySchema, valueSchema, blockManager, pageSizeBytes, null); + this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, null); } public UnsafeKVExternalSorter( StructType keySchema, StructType valueSchema, BlockManager blockManager, + SerializerManager serializerManager, long pageSizeBytes, @Nullable BytesToBytesMap map) throws IOException { this.keySchema = keySchema; @@ -77,6 +80,7 @@ public final class UnsafeKVExternalSorter { sorter = UnsafeExternalSorter.create( taskMemoryManager, blockManager, + serializerManager, taskContext, recordComparator, prefixComparator, @@ -116,6 +120,7 @@ public final class UnsafeKVExternalSorter { sorter = UnsafeExternalSorter.createWithExistingInMemorySorter( taskMemoryManager, blockManager, + serializerManager, taskContext, new KVComparator(ordering, keySchema.length()), prefixComparator, http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index a4c0e1c..270c09a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -339,6 +339,7 @@ case class Window( sorter = UnsafeExternalSorter.create( TaskContext.get().taskMemoryManager(), SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, TaskContext.get(), null, null, http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index c74ac8a..233ac26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -399,6 +399,7 @@ private[sql] class DynamicPartitionWriterContainer( sortingKeySchema, StructType.fromAttributes(dataColumns), SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, TaskContext.get().taskMemoryManager().pageSizeBytes) while (iterator.hasNext) { http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index fabd2fb..fb65b50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -41,6 +41,7 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField val sorter = UnsafeExternalSorter.create( context.taskMemoryManager(), SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, context, null, null, http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala index e03bd6a..476d93f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala @@ -120,7 +120,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { metricsSystem = null)) val sorter = new UnsafeKVExternalSorter( - keySchema, valueSchema, SparkEnv.get.blockManager, pageSize) + keySchema, valueSchema, SparkEnv.get.blockManager, SparkEnv.get.serializerManager, pageSize) // Insert the keys and values into the sorter inputData.foreach { case (k, v) => http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index a29d55e..794fe26 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -279,6 +279,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( StructType.fromAttributes(partitionOutput), StructType.fromAttributes(dataOutput), SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, TaskContext.get().taskMemoryManager().pageSizeBytes) while (iterator.hasNext) { http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index ace67a6..c56520b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -115,6 +115,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( assertValid() val hadoopConf = broadcastedHadoopConf.value val blockManager = SparkEnv.get.blockManager + val serializerManager = SparkEnv.get.serializerManager val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] val blockId = partition.blockId @@ -161,7 +162,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( logDebug(s"Stored partition data of $this into block manager with level $storageLevel") dataRead.rewind() } - blockManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead)) + serializerManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead)) .asInstanceOf[Iterator[T]] } http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 6d4f4b9..85350ff 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging +import org.apache.spark.serializer.SerializerManager import org.apache.spark.storage._ import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._ import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils} @@ -123,6 +124,7 @@ private[streaming] case class WriteAheadLogBasedStoreResult( */ private[streaming] class WriteAheadLogBasedBlockHandler( blockManager: BlockManager, + serializerManager: SerializerManager, streamId: Int, storageLevel: StorageLevel, conf: SparkConf, @@ -173,10 +175,10 @@ private[streaming] class WriteAheadLogBasedBlockHandler( val serializedBlock = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) - blockManager.dataSerialize(blockId, arrayBuffer.iterator) + serializerManager.dataSerialize(blockId, arrayBuffer.iterator) case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) - val serializedBlock = blockManager.dataSerialize(blockId, countIterator) + val serializedBlock = serializerManager.dataSerialize(blockId, countIterator) numRecords = countIterator.count serializedBlock case ByteBufferBlock(byteBuffer) => http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index e41fd11..4fb0f8c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -60,7 +60,7 @@ private[streaming] class ReceiverSupervisorImpl( "Please use streamingContext.checkpoint() to set the checkpoint directory. " + "See documentation for more details.") } - new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, + new WriteAheadLogBasedBlockHandler(env.blockManager, env.serializerManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) } else { new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 122ca06..4e77cd6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -60,6 +60,7 @@ class ReceivedBlockHandlerSuite val mapOutputTracker = new MapOutputTrackerMaster(conf) val shuffleManager = new HashShuffleManager(conf) val serializer = new KryoSerializer(conf) + var serializerManager = new SerializerManager(serializer, conf) val manualClock = new ManualClock val blockManagerSize = 10000000 val blockManagerBuffer = new ArrayBuffer[BlockManager]() @@ -156,7 +157,7 @@ class ReceivedBlockHandlerSuite val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf) val bytes = reader.read(fileSegment) reader.close() - blockManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList + serializerManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList } loggedData shouldEqual data } @@ -265,7 +266,6 @@ class ReceivedBlockHandlerSuite name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) - val serializerManager = new SerializerManager(serializer, conf) val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) memManager.setMemoryStore(blockManager.memoryStore) @@ -335,7 +335,8 @@ class ReceivedBlockHandlerSuite } } - def dataToByteBuffer(b: Seq[String]) = blockManager.dataSerialize(generateBlockId, b.iterator) + def dataToByteBuffer(b: Seq[String]) = + serializerManager.dataSerialize(generateBlockId, b.iterator) val blocks = data.grouped(10).toSeq @@ -367,8 +368,8 @@ class ReceivedBlockHandlerSuite /** Instantiate a WriteAheadLogBasedBlockHandler and run a code with it */ private def withWriteAheadLogBasedBlockHandler(body: WriteAheadLogBasedBlockHandler => Unit) { require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = false) === 1) - val receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, 1, - storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock) + val receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, serializerManager, + 1, storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock) try { body(receivedBlockHandler) } finally { http://git-wip-us.apache.org/repos/asf/spark/blob/3de24ae2/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index c4bf42d..ce5a6e0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.serializer.SerializerManager import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, FileBasedWriteAheadLogWriter} import org.apache.spark.util.Utils @@ -39,6 +40,7 @@ class WriteAheadLogBackedBlockRDDSuite var sparkContext: SparkContext = null var blockManager: BlockManager = null + var serializerManager: SerializerManager = null var dir: File = null override def beforeEach(): Unit = { @@ -58,6 +60,7 @@ class WriteAheadLogBackedBlockRDDSuite super.beforeAll() sparkContext = new SparkContext(conf) blockManager = sparkContext.env.blockManager + serializerManager = sparkContext.env.serializerManager } override def afterAll(): Unit = { @@ -65,6 +68,8 @@ class WriteAheadLogBackedBlockRDDSuite try { sparkContext.stop() System.clearProperty("spark.driver.port") + blockManager = null + serializerManager = null } finally { super.afterAll() } @@ -107,8 +112,6 @@ class WriteAheadLogBackedBlockRDDSuite * It can also test if the partitions that were read from the log were again stored in * block manager. * - * - * * @param numPartitions Number of partitions in RDD * @param numPartitionsInBM Number of partitions to write to the BlockManager. * Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager @@ -223,7 +226,7 @@ class WriteAheadLogBackedBlockRDDSuite require(blockData.size === blockIds.size) val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf) val segments = blockData.zip(blockIds).map { case (data, id) => - writer.write(blockManager.dataSerialize(id, data.iterator).toByteBuffer) + writer.write(serializerManager.dataSerialize(id, data.iterator).toByteBuffer) } writer.close() segments --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org