Repository: spark Updated Branches: refs/heads/branch-1.4 4afb578b7 -> 762ff2e11
Some minor cleanup after SPARK-4550. JoshRosen this PR addresses the comments you left on #4450 after it got merged. Author: Sandy Ryza <sa...@cloudera.com> Closes #5916 from sryza/sandy-spark-4550-cleanup and squashes the following commits: dee3d85 [Sandy Ryza] Some minor cleanup after SPARK-4550. (cherry picked from commit 0092abb47a0f9fdc716d5dfc1c591ddb45de8c98) Signed-off-by: Josh Rosen <joshro...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/762ff2e1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/762ff2e1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/762ff2e1 Branch: refs/heads/branch-1.4 Commit: 762ff2e1138ae59c5e67cb068bba1a34aa54a5bf Parents: 4afb578 Author: Sandy Ryza <sa...@cloudera.com> Authored: Tue May 5 18:32:16 2015 -0700 Committer: Josh Rosen <joshro...@databricks.com> Committed: Tue May 5 18:33:04 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/storage/BlockObjectWriter.scala | 9 ++------- .../util/collection/PartitionedSerializedPairBuffer.scala | 4 ++-- 2 files changed, 4 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/762ff2e1/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 499dd97..8bc4e20 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -59,7 +59,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) extends Ou def write(key: Any, value: Any) /** - * Notify the writer that a record worth of bytes has been written with writeBytes. + * Notify the writer that a record worth of bytes has been written with OutputStream#write. */ def recordWritten() @@ -215,12 +215,7 @@ private[spark] class DiskBlockObjectWriter( objOut.writeKey(key) objOut.writeValue(value) - numRecordsWritten += 1 - writeMetrics.incShuffleRecordsWritten(1) - - if (numRecordsWritten % 32 == 0) { - updateBytesWritten() - } + recordWritten() } override def write(b: Int): Unit = throw new UnsupportedOperationException() http://git-wip-us.apache.org/repos/asf/spark/blob/762ff2e1/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala index b5ca0c6..ac9ea63 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala @@ -71,10 +71,10 @@ private[spark] class PartitionedSerializedPairBuffer[K, V]( if (keyStart < 0) { throw new Exception(s"Can't grow buffer beyond ${1 << 31} bytes") } - kvSerializationStream.writeObject[Any](key) + kvSerializationStream.writeKey[Any](key) kvSerializationStream.flush() val valueStart = kvBuffer.size - kvSerializationStream.writeObject[Any](value) + kvSerializationStream.writeValue[Any](value) kvSerializationStream.flush() val valueEnd = kvBuffer.size --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org