spark git commit: [SPARK-12390] Clean up unused serializer parameter in BlockManager

2015-12-17 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 881f2544e -> 88bbb5429


[SPARK-12390] Clean up unused serializer parameter in BlockManager

No change in functionality is intended. This only changes internal API.

Author: Andrew Or 

Closes #10343 from andrewor14/clean-bm-serializer.

Conflicts:
core/src/main/scala/org/apache/spark/storage/BlockManager.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88bbb542
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88bbb542
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88bbb542

Branch: refs/heads/branch-1.6
Commit: 88bbb5429dd3efcff6b2835a70143247b08ae6b2
Parents: 881f254
Author: Andrew Or 
Authored: Wed Dec 16 20:01:47 2015 -0800
Committer: Andrew Or 
Committed: Thu Dec 17 12:01:13 2015 -0800

--
 .../org/apache/spark/storage/BlockManager.scala | 29 
 .../org/apache/spark/storage/DiskStore.scala| 10 ---
 2 files changed, 11 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/88bbb542/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ab0007f..2cc2fd9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1190,20 +1190,16 @@ private[spark] class BlockManager(
   def dataSerializeStream(
   blockId: BlockId,
   outputStream: OutputStream,
-  values: Iterator[Any],
-  serializer: Serializer = defaultSerializer): Unit = {
+  values: Iterator[Any]): Unit = {
 val byteStream = new BufferedOutputStream(outputStream)
-val ser = serializer.newInstance()
+val ser = defaultSerializer.newInstance()
 ser.serializeStream(wrapForCompression(blockId, 
byteStream)).writeAll(values).close()
   }
 
   /** Serializes into a byte buffer. */
-  def dataSerialize(
-  blockId: BlockId,
-  values: Iterator[Any],
-  serializer: Serializer = defaultSerializer): ByteBuffer = {
+  def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
 val byteStream = new ByteArrayOutputStream(4096)
-dataSerializeStream(blockId, byteStream, values, serializer)
+dataSerializeStream(blockId, byteStream, values)
 ByteBuffer.wrap(byteStream.toByteArray)
   }
 
@@ -1211,24 +1207,21 @@ private[spark] class BlockManager(
* Deserializes a ByteBuffer into an iterator of values and disposes of it 
when the end of
* the iterator is reached.
*/
-  def dataDeserialize(
-  blockId: BlockId,
-  bytes: ByteBuffer,
-  serializer: Serializer = defaultSerializer): Iterator[Any] = {
+  def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = {
 bytes.rewind()
-dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), 
serializer)
+dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
   }
 
   /**
* Deserializes a InputStream into an iterator of values and disposes of it 
when the end of
* the iterator is reached.
*/
-  def dataDeserializeStream(
-  blockId: BlockId,
-  inputStream: InputStream,
-  serializer: Serializer = defaultSerializer): Iterator[Any] = {
+  def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): 
Iterator[Any] = {
 val stream = new BufferedInputStream(inputStream)
-serializer.newInstance().deserializeStream(wrapForCompression(blockId, 
stream)).asIterator
+defaultSerializer
+  .newInstance()
+  .deserializeStream(wrapForCompression(blockId, stream))
+  .asIterator
   }
 
   def stop(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/88bbb542/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index c008b9d..6c44771 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
 getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, 
buffer))
   }
 
-  /**
-   * A version of getValues that allows a custom serializer. This is used as 
part of the
-   * shuffle short-circuit code.
-   */
-  def getValues(blockId: BlockId, serializer: Serializer): 
Option[Iterator[Any]] = {
-// TODO: Should bypass getBytes and use a stream ba

spark git commit: [SPARK-12390] Clean up unused serializer parameter in BlockManager

2015-12-16 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master d1508dd9b -> 97678edea


[SPARK-12390] Clean up unused serializer parameter in BlockManager

No change in functionality is intended. This only changes internal API.

Author: Andrew Or 

Closes #10343 from andrewor14/clean-bm-serializer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97678ede
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97678ede
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97678ede

Branch: refs/heads/master
Commit: 97678edeaaafc19ea18d044233a952d2e2e89fbc
Parents: d1508dd
Author: Andrew Or 
Authored: Wed Dec 16 20:01:47 2015 -0800
Committer: Andrew Or 
Committed: Wed Dec 16 20:01:47 2015 -0800

--
 .../org/apache/spark/storage/BlockManager.scala | 29 
 .../org/apache/spark/storage/DiskStore.scala| 10 ---
 2 files changed, 11 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/97678ede/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 540e1ec..6074fc5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1190,20 +1190,16 @@ private[spark] class BlockManager(
   def dataSerializeStream(
   blockId: BlockId,
   outputStream: OutputStream,
-  values: Iterator[Any],
-  serializer: Serializer = defaultSerializer): Unit = {
+  values: Iterator[Any]): Unit = {
 val byteStream = new BufferedOutputStream(outputStream)
-val ser = serializer.newInstance()
+val ser = defaultSerializer.newInstance()
 ser.serializeStream(wrapForCompression(blockId, 
byteStream)).writeAll(values).close()
   }
 
   /** Serializes into a byte buffer. */
-  def dataSerialize(
-  blockId: BlockId,
-  values: Iterator[Any],
-  serializer: Serializer = defaultSerializer): ByteBuffer = {
+  def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
 val byteStream = new ByteBufferOutputStream(4096)
-dataSerializeStream(blockId, byteStream, values, serializer)
+dataSerializeStream(blockId, byteStream, values)
 byteStream.toByteBuffer
   }
 
@@ -1211,24 +1207,21 @@ private[spark] class BlockManager(
* Deserializes a ByteBuffer into an iterator of values and disposes of it 
when the end of
* the iterator is reached.
*/
-  def dataDeserialize(
-  blockId: BlockId,
-  bytes: ByteBuffer,
-  serializer: Serializer = defaultSerializer): Iterator[Any] = {
+  def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = {
 bytes.rewind()
-dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), 
serializer)
+dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
   }
 
   /**
* Deserializes a InputStream into an iterator of values and disposes of it 
when the end of
* the iterator is reached.
*/
-  def dataDeserializeStream(
-  blockId: BlockId,
-  inputStream: InputStream,
-  serializer: Serializer = defaultSerializer): Iterator[Any] = {
+  def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): 
Iterator[Any] = {
 val stream = new BufferedInputStream(inputStream)
-serializer.newInstance().deserializeStream(wrapForCompression(blockId, 
stream)).asIterator
+defaultSerializer
+  .newInstance()
+  .deserializeStream(wrapForCompression(blockId, stream))
+  .asIterator
   }
 
   def stop(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/97678ede/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index c008b9d..6c44771 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
 getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, 
buffer))
   }
 
-  /**
-   * A version of getValues that allows a custom serializer. This is used as 
part of the
-   * shuffle short-circuit code.
-   */
-  def getValues(blockId: BlockId, serializer: Serializer): 
Option[Iterator[Any]] = {
-// TODO: Should bypass getBytes and use a stream based implementation, so 
that
-// we won't use a lot of memory during e.g. external sort merge.
-get