spark git commit: [SPARK-12390] Clean up unused serializer parameter in BlockManager
Repository: spark Updated Branches: refs/heads/branch-1.6 881f2544e -> 88bbb5429 [SPARK-12390] Clean up unused serializer parameter in BlockManager No change in functionality is intended. This only changes internal API. Author: Andrew Or Closes #10343 from andrewor14/clean-bm-serializer. Conflicts: core/src/main/scala/org/apache/spark/storage/BlockManager.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88bbb542 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88bbb542 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88bbb542 Branch: refs/heads/branch-1.6 Commit: 88bbb5429dd3efcff6b2835a70143247b08ae6b2 Parents: 881f254 Author: Andrew Or Authored: Wed Dec 16 20:01:47 2015 -0800 Committer: Andrew Or Committed: Thu Dec 17 12:01:13 2015 -0800 -- .../org/apache/spark/storage/BlockManager.scala | 29 .../org/apache/spark/storage/DiskStore.scala| 10 --- 2 files changed, 11 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/88bbb542/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index ab0007f..2cc2fd9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1190,20 +1190,16 @@ private[spark] class BlockManager( def dataSerializeStream( blockId: BlockId, outputStream: OutputStream, - values: Iterator[Any], - serializer: Serializer = defaultSerializer): Unit = { + values: Iterator[Any]): Unit = { val byteStream = new BufferedOutputStream(outputStream) -val ser = serializer.newInstance() +val ser = defaultSerializer.newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() } /** Serializes into a byte buffer. */ - def dataSerialize( - blockId: BlockId, - values: Iterator[Any], - serializer: Serializer = defaultSerializer): ByteBuffer = { + def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = { val byteStream = new ByteArrayOutputStream(4096) -dataSerializeStream(blockId, byteStream, values, serializer) +dataSerializeStream(blockId, byteStream, values) ByteBuffer.wrap(byteStream.toByteArray) } @@ -1211,24 +1207,21 @@ private[spark] class BlockManager( * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserialize( - blockId: BlockId, - bytes: ByteBuffer, - serializer: Serializer = defaultSerializer): Iterator[Any] = { + def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = { bytes.rewind() -dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer) +dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true)) } /** * Deserializes a InputStream into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserializeStream( - blockId: BlockId, - inputStream: InputStream, - serializer: Serializer = defaultSerializer): Iterator[Any] = { + def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] = { val stream = new BufferedInputStream(inputStream) -serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator +defaultSerializer + .newInstance() + .deserializeStream(wrapForCompression(blockId, stream)) + .asIterator } def stop(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/88bbb542/core/src/main/scala/org/apache/spark/storage/DiskStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index c008b9d..6c44771 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) } - /** - * A version of getValues that allows a custom serializer. This is used as part of the - * shuffle short-circuit code. - */ - def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { -// TODO: Should bypass getBytes and use a stream ba
spark git commit: [SPARK-12390] Clean up unused serializer parameter in BlockManager
Repository: spark Updated Branches: refs/heads/master d1508dd9b -> 97678edea [SPARK-12390] Clean up unused serializer parameter in BlockManager No change in functionality is intended. This only changes internal API. Author: Andrew Or Closes #10343 from andrewor14/clean-bm-serializer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97678ede Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97678ede Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97678ede Branch: refs/heads/master Commit: 97678edeaaafc19ea18d044233a952d2e2e89fbc Parents: d1508dd Author: Andrew Or Authored: Wed Dec 16 20:01:47 2015 -0800 Committer: Andrew Or Committed: Wed Dec 16 20:01:47 2015 -0800 -- .../org/apache/spark/storage/BlockManager.scala | 29 .../org/apache/spark/storage/DiskStore.scala| 10 --- 2 files changed, 11 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/97678ede/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 540e1ec..6074fc5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1190,20 +1190,16 @@ private[spark] class BlockManager( def dataSerializeStream( blockId: BlockId, outputStream: OutputStream, - values: Iterator[Any], - serializer: Serializer = defaultSerializer): Unit = { + values: Iterator[Any]): Unit = { val byteStream = new BufferedOutputStream(outputStream) -val ser = serializer.newInstance() +val ser = defaultSerializer.newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() } /** Serializes into a byte buffer. */ - def dataSerialize( - blockId: BlockId, - values: Iterator[Any], - serializer: Serializer = defaultSerializer): ByteBuffer = { + def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = { val byteStream = new ByteBufferOutputStream(4096) -dataSerializeStream(blockId, byteStream, values, serializer) +dataSerializeStream(blockId, byteStream, values) byteStream.toByteBuffer } @@ -1211,24 +1207,21 @@ private[spark] class BlockManager( * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserialize( - blockId: BlockId, - bytes: ByteBuffer, - serializer: Serializer = defaultSerializer): Iterator[Any] = { + def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = { bytes.rewind() -dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer) +dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true)) } /** * Deserializes a InputStream into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserializeStream( - blockId: BlockId, - inputStream: InputStream, - serializer: Serializer = defaultSerializer): Iterator[Any] = { + def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] = { val stream = new BufferedInputStream(inputStream) -serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator +defaultSerializer + .newInstance() + .deserializeStream(wrapForCompression(blockId, stream)) + .asIterator } def stop(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/97678ede/core/src/main/scala/org/apache/spark/storage/DiskStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index c008b9d..6c44771 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) } - /** - * A version of getValues that allows a custom serializer. This is used as part of the - * shuffle short-circuit code. - */ - def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { -// TODO: Should bypass getBytes and use a stream based implementation, so that -// we won't use a lot of memory during e.g. external sort merge. -get