Repository: spark
Updated Branches:
  refs/heads/master 9d225a910 -> b55cade85


Remove the remoteFetchTime metric.

This metric is confusing: it adds up all of the time to fetch
shuffle inputs, but fetches often happen in parallel, so
remoteFetchTime can be much longer than the task execution time.

@squito it looks like you added this metric -- do you have a use case for it?

cc @shivaram -- I know you've looked at the shuffle performance a lot so chime 
in here if this metric has turned out to be useful for you!

Author: Kay Ousterhout <kayousterh...@gmail.com>

Closes #62 from kayousterhout/remove_fetch_variable and squashes the following 
commits:

43341eb [Kay Ousterhout] Remote the remoteFetchTime metric.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b55cade8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b55cade8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b55cade8

Branch: refs/heads/master
Commit: b55cade853003d86356a50c6dba82210c8adb667
Parents: 9d225a9
Author: Kay Ousterhout <kayousterh...@gmail.com>
Authored: Mon Mar 3 16:12:00 2014 -0800
Committer: Kay Ousterhout <kayousterh...@gmail.com>
Committed: Mon Mar 3 16:12:00 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/BlockStoreShuffleFetcher.scala     | 1 -
 .../main/scala/org/apache/spark/executor/TaskMetrics.scala    | 7 -------
 .../src/main/scala/org/apache/spark/scheduler/JobLogger.scala | 1 -
 .../scala/org/apache/spark/storage/BlockFetcherIterator.scala | 4 ----
 .../scala/org/apache/spark/scheduler/SparkListenerSuite.scala | 1 -
 5 files changed, 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b55cade8/core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala 
b/core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala
index 754b46a..a673924 100644
--- a/core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala
@@ -79,7 +79,6 @@ private[spark] class BlockStoreShuffleFetcher extends 
ShuffleFetcher with Loggin
     val completionIter = CompletionIterator[T, Iterator[T]](itr, {
       val shuffleMetrics = new ShuffleReadMetrics
       shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
-      shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
       shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
       shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
       shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks

http://git-wip-us.apache.org/repos/asf/spark/blob/b55cade8/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 4553399..760458c 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -104,13 +104,6 @@ class ShuffleReadMetrics extends Serializable {
   var fetchWaitTime: Long = _
 
   /**
-   * Total time spent fetching remote shuffle blocks. This aggregates the time 
spent fetching all
-   * input blocks. Since block fetches are both pipelined and parallelized, 
this can
-   * exceed fetchWaitTime and executorRunTime.
-   */
-  var remoteFetchTime: Long = _
-
-  /**
    * Total number of remote bytes read from the shuffle by this task
    */
   var remoteBytesRead: Long = _

http://git-wip-us.apache.org/repos/asf/spark/blob/b55cade8/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala 
b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 006e2a3..80f9ec7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -275,7 +275,6 @@ class JobLogger(val user: String, val logDirName: String)
         " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
         " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
         " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
-        " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime +
         " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
       case None => ""
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b55cade8/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index fb50b45..bcfc391 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -49,7 +49,6 @@ trait BlockFetcherIterator extends Iterator[(BlockId, 
Option[Iterator[Any]])] wi
   def totalBlocks: Int
   def numLocalBlocks: Int
   def numRemoteBlocks: Int
-  def remoteFetchTime: Long
   def fetchWaitTime: Long
   def remoteBytesRead: Long
 }
@@ -79,7 +78,6 @@ object BlockFetcherIterator {
     import blockManager._
 
     private var _remoteBytesRead = 0L
-    private var _remoteFetchTime = 0L
     private var _fetchWaitTime = 0L
 
     if (blocksByAddress == null) {
@@ -125,7 +123,6 @@ object BlockFetcherIterator {
       future.onSuccess {
         case Some(message) => {
           val fetchDone = System.currentTimeMillis()
-          _remoteFetchTime += fetchDone - fetchStart
           val bufferMessage = message.asInstanceOf[BufferMessage]
           val blockMessageArray = 
BlockMessageArray.fromBufferMessage(bufferMessage)
           for (blockMessage <- blockMessageArray) {
@@ -241,7 +238,6 @@ object BlockFetcherIterator {
     override def totalBlocks: Int = numLocal + numRemote
     override def numLocalBlocks: Int = numLocal
     override def numRemoteBlocks: Int = numRemote
-    override def remoteFetchTime: Long = _remoteFetchTime
     override def fetchWaitTime: Long = _fetchWaitTime
     override def remoteBytesRead: Long = _remoteBytesRead
  

http://git-wip-us.apache.org/repos/asf/spark/blob/b55cade8/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 368c515..7c4f2b4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -129,7 +129,6 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with ShouldMatc
           sm.localBlocksFetched should be > (0)
           sm.remoteBlocksFetched should be (0)
           sm.remoteBytesRead should be (0l)
-          sm.remoteFetchTime should be (0l)
         }
       }
     }

Reply via email to