Repository: spark Updated Branches: refs/heads/master 9d225a910 -> b55cade85
Remove the remoteFetchTime metric. This metric is confusing: it adds up all of the time to fetch shuffle inputs, but fetches often happen in parallel, so remoteFetchTime can be much longer than the task execution time. @squito it looks like you added this metric -- do you have a use case for it? cc @shivaram -- I know you've looked at the shuffle performance a lot so chime in here if this metric has turned out to be useful for you! Author: Kay Ousterhout <kayousterh...@gmail.com> Closes #62 from kayousterhout/remove_fetch_variable and squashes the following commits: 43341eb [Kay Ousterhout] Remote the remoteFetchTime metric. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b55cade8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b55cade8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b55cade8 Branch: refs/heads/master Commit: b55cade853003d86356a50c6dba82210c8adb667 Parents: 9d225a9 Author: Kay Ousterhout <kayousterh...@gmail.com> Authored: Mon Mar 3 16:12:00 2014 -0800 Committer: Kay Ousterhout <kayousterh...@gmail.com> Committed: Mon Mar 3 16:12:00 2014 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/BlockStoreShuffleFetcher.scala | 1 - .../main/scala/org/apache/spark/executor/TaskMetrics.scala | 7 ------- .../src/main/scala/org/apache/spark/scheduler/JobLogger.scala | 1 - .../scala/org/apache/spark/storage/BlockFetcherIterator.scala | 4 ---- .../scala/org/apache/spark/scheduler/SparkListenerSuite.scala | 1 - 5 files changed, 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b55cade8/core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala index 754b46a..a673924 100644 --- a/core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala @@ -79,7 +79,6 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin val completionIter = CompletionIterator[T, Iterator[T]](itr, { val shuffleMetrics = new ShuffleReadMetrics shuffleMetrics.shuffleFinishTime = System.currentTimeMillis - shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks http://git-wip-us.apache.org/repos/asf/spark/blob/b55cade8/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 4553399..760458c 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -104,13 +104,6 @@ class ShuffleReadMetrics extends Serializable { var fetchWaitTime: Long = _ /** - * Total time spent fetching remote shuffle blocks. This aggregates the time spent fetching all - * input blocks. Since block fetches are both pipelined and parallelized, this can - * exceed fetchWaitTime and executorRunTime. - */ - var remoteFetchTime: Long = _ - - /** * Total number of remote bytes read from the shuffle by this task */ var remoteBytesRead: Long = _ http://git-wip-us.apache.org/repos/asf/spark/blob/b55cade8/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 006e2a3..80f9ec7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -275,7 +275,6 @@ class JobLogger(val user: String, val logDirName: String) " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + - " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + " REMOTE_BYTES_READ=" + metrics.remoteBytesRead case None => "" } http://git-wip-us.apache.org/repos/asf/spark/blob/b55cade8/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index fb50b45..bcfc391 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -49,7 +49,6 @@ trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] wi def totalBlocks: Int def numLocalBlocks: Int def numRemoteBlocks: Int - def remoteFetchTime: Long def fetchWaitTime: Long def remoteBytesRead: Long } @@ -79,7 +78,6 @@ object BlockFetcherIterator { import blockManager._ private var _remoteBytesRead = 0L - private var _remoteFetchTime = 0L private var _fetchWaitTime = 0L if (blocksByAddress == null) { @@ -125,7 +123,6 @@ object BlockFetcherIterator { future.onSuccess { case Some(message) => { val fetchDone = System.currentTimeMillis() - _remoteFetchTime += fetchDone - fetchStart val bufferMessage = message.asInstanceOf[BufferMessage] val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) for (blockMessage <- blockMessageArray) { @@ -241,7 +238,6 @@ object BlockFetcherIterator { override def totalBlocks: Int = numLocal + numRemote override def numLocalBlocks: Int = numLocal override def numRemoteBlocks: Int = numRemote - override def remoteFetchTime: Long = _remoteFetchTime override def fetchWaitTime: Long = _fetchWaitTime override def remoteBytesRead: Long = _remoteBytesRead http://git-wip-us.apache.org/repos/asf/spark/blob/b55cade8/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 368c515..7c4f2b4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -129,7 +129,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc sm.localBlocksFetched should be > (0) sm.remoteBlocksFetched should be (0) sm.remoteBytesRead should be (0l) - sm.remoteFetchTime should be (0l) } } }