Repository: spark
Updated Branches:
  refs/heads/master 57961567e -> dcd1e42d6


[SPARK-4874] [CORE] Collect record count metrics

Collects record counts for both Input/Output and Shuffle Metrics. For the 
input/output metrics, it just appends the counter every time the iterators get 
accessed.

For shuffle on the write side, we count the metrics post aggregation (after a 
map side combine) and on the read side we count the metrics pre aggregation. 
This allows both the bytes read/written metrics and the records read/written to 
line up.

For backwards compatibility, if we deserialize an older event that doesn't have 
record metrics, we set the metric to -1.

Author: Kostas Sakellis <kos...@cloudera.com>

Closes #4067 from ksakellis/kostas-spark-4874 and squashes the following 
commits:

bd919be [Kostas Sakellis] Changed 'Records Read' in shuffleReadMetrics json 
output to 'Total Records Read'
dad4d57 [Kostas Sakellis] Add a comment and check to BlockObjectWriter so that 
it cannot be reopend.
6f236a1 [Kostas Sakellis] Renamed _recordsWritten in ShuffleWriteMetrics to be 
more consistent
70620a0 [Kostas Sakellis] CR Feedback
17faa3a [Kostas Sakellis] Removed AtomicLong in favour of using Long
b6f9923 [Kostas Sakellis] Merge AfterNextInterceptingIterator with 
InterruptableIterator to save a function call
46c8186 [Kostas Sakellis] Combined Bytes and # records into one column
57551c1 [Kostas Sakellis] Conforms to SPARK-3288
6cdb44e [Kostas Sakellis] Removed the generic InterceptingIterator and repalced 
it with specific implementation
1aa273c [Kostas Sakellis] CR Feedback
1bb78b1 [Kostas Sakellis] [SPARK-4874] [CORE] Collect record count metrics


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dcd1e42d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dcd1e42d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dcd1e42d

Branch: refs/heads/master
Commit: dcd1e42d6b6ac08d2c0736bf61a15f515a1f222b
Parents: 5796156
Author: Kostas Sakellis <kos...@cloudera.com>
Authored: Fri Feb 6 14:31:20 2015 -0800
Committer: Patrick Wendell <patr...@databricks.com>
Committed: Fri Feb 6 14:31:20 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/CacheManager.scala   |  11 +-
 .../org/apache/spark/executor/TaskMetrics.scala |  54 ++++--
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |   6 +-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |   6 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  12 +-
 .../shuffle/hash/BlockStoreShuffleFetcher.scala |   8 +-
 .../org/apache/spark/storage/BlockManager.scala |   2 +-
 .../spark/storage/BlockObjectWriter.scala       |  24 ++-
 .../scala/org/apache/spark/ui/ToolTips.scala    |   9 +-
 .../org/apache/spark/ui/exec/ExecutorsTab.scala |   6 +
 .../apache/spark/ui/jobs/ExecutorTable.scala    |  86 +++++++--
 .../spark/ui/jobs/JobProgressListener.scala     |  24 +++
 .../org/apache/spark/ui/jobs/StagePage.scala    | 148 +++++++++------
 .../scala/org/apache/spark/ui/jobs/UIData.scala |  14 ++
 .../org/apache/spark/util/JsonProtocol.scala    |  19 +-
 .../spark/util/collection/ExternalSorter.scala  |   1 +
 .../spark/metrics/InputOutputMetricsSuite.scala | 186 ++++++++++++++++---
 .../spark/storage/BlockObjectWriterSuite.scala  |  21 +++
 .../ui/jobs/JobProgressListenerSuite.scala      |   2 +-
 .../apache/spark/util/JsonProtocolSuite.scala   |  55 +++++-
 20 files changed, 548 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala 
b/core/src/main/scala/org/apache/spark/CacheManager.scala
index a0c0372..a96d754 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -47,10 +47,15 @@ private[spark] class CacheManager(blockManager: 
BlockManager) extends Logging {
         val inputMetrics = blockResult.inputMetrics
         val existingMetrics = context.taskMetrics
           .getInputMetricsForReadMethod(inputMetrics.readMethod)
-        existingMetrics.addBytesRead(inputMetrics.bytesRead)
-
-        new InterruptibleIterator(context, 
blockResult.data.asInstanceOf[Iterator[T]])
+        existingMetrics.incBytesRead(inputMetrics.bytesRead)
 
+        val iter = blockResult.data.asInstanceOf[Iterator[T]]
+        new InterruptibleIterator[T](context, iter) {
+          override def next(): T = {
+            existingMetrics.incRecordsRead(1)
+            delegate.next()
+          }
+        }
       case None =>
         // Acquire a lock for loading this partition
         // If another thread already holds the lock, wait for it to finish 
return its results

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 97912c6..d056591 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -194,18 +194,19 @@ class TaskMetrics extends Serializable {
   /**
    * Aggregates shuffle read metrics for all registered dependencies into 
shuffleReadMetrics.
    */
-  private[spark] def updateShuffleReadMetrics() = synchronized {
+  private[spark] def updateShuffleReadMetrics(): Unit = synchronized {
     val merged = new ShuffleReadMetrics()
     for (depMetrics <- depsShuffleReadMetrics) {
       merged.incFetchWaitTime(depMetrics.fetchWaitTime)
       merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
       merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
       merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
+      merged.incRecordsRead(depMetrics.recordsRead)
     }
     _shuffleReadMetrics = Some(merged)
   }
 
-  private[spark] def updateInputMetrics() = synchronized {
+  private[spark] def updateInputMetrics(): Unit = synchronized {
     inputMetrics.foreach(_.updateBytesRead())
   }
 }
@@ -242,27 +243,31 @@ object DataWriteMethod extends Enumeration with 
Serializable {
 @DeveloperApi
 case class InputMetrics(readMethod: DataReadMethod.Value) {
 
-  private val _bytesRead: AtomicLong = new AtomicLong()
+  /**
+   * This is volatile so that it is visible to the updater thread.
+   */
+  @volatile @transient var bytesReadCallback: Option[() => Long] = None
 
   /**
    * Total bytes read.
    */
-  def bytesRead: Long = _bytesRead.get()
-  @volatile @transient var bytesReadCallback: Option[() => Long] = None
+  private var _bytesRead: Long = _
+  def bytesRead: Long = _bytesRead
+  def incBytesRead(bytes: Long) = _bytesRead += bytes
 
   /**
-   * Adds additional bytes read for this read method.
+   * Total records read.
    */
-  def addBytesRead(bytes: Long) = {
-    _bytesRead.addAndGet(bytes)
-  }
+  private var _recordsRead: Long = _
+  def recordsRead: Long = _recordsRead
+  def incRecordsRead(records: Long) =  _recordsRead += records
 
   /**
    * Invoke the bytesReadCallback and mutate bytesRead.
    */
   def updateBytesRead() {
     bytesReadCallback.foreach { c =>
-      _bytesRead.set(c())
+      _bytesRead = c()
     }
   }
 
@@ -287,6 +292,13 @@ case class OutputMetrics(writeMethod: 
DataWriteMethod.Value) {
   private var _bytesWritten: Long = _
   def bytesWritten = _bytesWritten
   private[spark] def setBytesWritten(value : Long) = _bytesWritten = value
+
+  /**
+   * Total records written
+   */
+  private var _recordsWritten: Long = 0L
+  def recordsWritten = _recordsWritten
+  private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value
 }
 
 /**
@@ -301,7 +313,7 @@ class ShuffleReadMetrics extends Serializable {
   private var _remoteBlocksFetched: Int = _
   def remoteBlocksFetched = _remoteBlocksFetched
   private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched 
+= value
-  private[spark] def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched 
-= value
+  private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched 
-= value
   
   /**
    * Number of local blocks fetched in this shuffle by this task
@@ -309,8 +321,7 @@ class ShuffleReadMetrics extends Serializable {
   private var _localBlocksFetched: Int = _
   def localBlocksFetched = _localBlocksFetched
   private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched 
+= value
-  private[spark] def defLocalBlocksFetched(value: Int) = _localBlocksFetched 
-= value
-
+  private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched 
-= value
 
   /**
    * Time the task spent waiting for remote shuffle blocks. This only includes 
the time
@@ -334,6 +345,14 @@ class ShuffleReadMetrics extends Serializable {
    * Number of blocks fetched in this shuffle by this task (remote or local)
    */
   def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched
+
+  /**
+   * Total number of records read from the shuffle by this task
+   */
+  private var _recordsRead: Long = _
+  def recordsRead = _recordsRead
+  private[spark] def incRecordsRead(value: Long) = _recordsRead += value
+  private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
 }
 
 /**
@@ -358,5 +377,12 @@ class ShuffleWriteMetrics extends Serializable {
   private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += 
value
   private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= 
value
   
-
+  /**
+   * Total number of records written to the shuffle by this task
+   */
+  @volatile private var _shuffleRecordsWritten: Long = _
+  def shuffleRecordsWritten = _shuffleRecordsWritten
+  private[spark] def incShuffleRecordsWritten(value: Long) = 
_shuffleRecordsWritten += value
+  private[spark] def decShuffleRecordsWritten(value: Long) = 
_shuffleRecordsWritten -= value
+  private[spark] def setShuffleRecordsWritten(value: Long) = 
_shuffleRecordsWritten = value
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 89adddc..486e86c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -247,7 +247,9 @@ class HadoopRDD[K, V](
           case eof: EOFException =>
             finished = true
         }
-
+        if (!finished) {
+          inputMetrics.incRecordsRead(1)
+        }
         (key, value)
       }
 
@@ -261,7 +263,7 @@ class HadoopRDD[K, V](
             // If we can't get the bytes read from the FS stats, fall back to 
the split size,
             // which may be inaccurate.
             try {
-              inputMetrics.addBytesRead(split.inputSplit.value.getLength)
+              inputMetrics.incBytesRead(split.inputSplit.value.getLength)
             } catch {
               case e: java.io.IOException =>
                 logWarning("Unable to get input size to set InputMetrics for 
task", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 44b9ffd..7fb9484 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -151,7 +151,9 @@ class NewHadoopRDD[K, V](
           throw new java.util.NoSuchElementException("End of stream")
         }
         havePair = false
-
+        if (!finished) {
+          inputMetrics.incRecordsRead(1)
+        }
         (reader.getCurrentKey, reader.getCurrentValue)
       }
 
@@ -165,7 +167,7 @@ class NewHadoopRDD[K, V](
             // If we can't get the bytes read from the FS stats, fall back to 
the split size,
             // which may be inaccurate.
             try {
-              
inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength)
+              
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
             } catch {
               case e: java.io.IOException =>
                 logWarning("Unable to get input size to set InputMetrics for 
task", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 49b88a9..955b42c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, 
JobConf, OutputFormat}
 import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => 
NewOutputFormat,
-RecordWriter => NewRecordWriter}
+  RecordWriter => NewRecordWriter}
 
 import org.apache.spark._
 import org.apache.spark.Partitioner.defaultPartitioner
@@ -993,8 +993,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       val (outputMetrics, bytesWrittenCallback) = 
initHadoopOutputMetrics(context)
 
       val writer = 
format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
+      var recordsWritten = 0L
       try {
-        var recordsWritten = 0L
         while (iter.hasNext) {
           val pair = iter.next()
           writer.write(pair._1, pair._2)
@@ -1008,6 +1008,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       }
       committer.commitTask(hadoopContext)
       bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) 
}
+      outputMetrics.setRecordsWritten(recordsWritten)
       1
     } : Int
 
@@ -1065,8 +1066,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
 
       writer.setup(context.stageId, context.partitionId, taskAttemptId)
       writer.open()
+      var recordsWritten = 0L
       try {
-        var recordsWritten = 0L
         while (iter.hasNext) {
           val record = iter.next()
           writer.write(record._1.asInstanceOf[AnyRef], 
record._2.asInstanceOf[AnyRef])
@@ -1080,6 +1081,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       }
       writer.commit()
       bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) 
}
+      outputMetrics.setRecordsWritten(recordsWritten)
     }
 
     self.context.runJob(self, writeToFile)
@@ -1097,9 +1099,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
 
   private def maybeUpdateOutputMetrics(bytesWrittenCallback: Option[() => 
Long],
       outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
-    if (recordsWritten % 
PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
-        && bytesWrittenCallback.isDefined) {
+    if (recordsWritten % 
PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
       bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) 
}
+      outputMetrics.setRecordsWritten(recordsWritten)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
 
b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index e3e7434..7a2c5ae 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -86,6 +86,12 @@ private[hash] object BlockStoreShuffleFetcher extends 
Logging {
       context.taskMetrics.updateShuffleReadMetrics()
     })
 
-    new InterruptibleIterator[T](context, completionIter)
+    new InterruptibleIterator[T](context, completionIter) {
+      val readMetrics = 
context.taskMetrics.createShuffleReadMetricsForDependency()
+      override def next(): T = {
+        readMetrics.incRecordsRead(1)
+        delegate.next()
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 8bc5a1c..86dbd89 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -53,7 +53,7 @@ private[spark] class BlockResult(
     readMethod: DataReadMethod.Value,
     bytes: Long) {
   val inputMetrics = new InputMetrics(readMethod)
-  inputMetrics.addBytesRead(bytes)
+  inputMetrics.incBytesRead(bytes)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 3198d76..8116417 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -29,7 +29,8 @@ import org.apache.spark.executor.ShuffleWriteMetrics
  * appending data to an existing block, and can guarantee atomicity in the 
case of faults
  * as it allows the caller to revert partial writes.
  *
- * This interface does not support concurrent writes.
+ * This interface does not support concurrent writes. Also, once the writer has
+ * been opened, it cannot be reopened again.
  */
 private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
 
@@ -95,6 +96,7 @@ private[spark] class DiskBlockObjectWriter(
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
   private var initialized = false
+  private var hasBeenClosed = false
 
   /**
    * Cursors used to represent positions in the file.
@@ -115,11 +117,16 @@ private[spark] class DiskBlockObjectWriter(
   private var finalPosition: Long = -1
   private var reportedPosition = initialPosition
 
-  /** Calling channel.position() to update the write metrics can be a little 
bit expensive, so we
-    * only call it every N writes */
-  private var writesSinceMetricsUpdate = 0
+  /**
+   * Keep track of number of records written and also use this to periodically
+   * output bytes written since the latter is expensive to do for each record.
+   */
+  private var numRecordsWritten = 0
 
   override def open(): BlockObjectWriter = {
+    if (hasBeenClosed) {
+      throw new IllegalStateException("Writer already closed. Cannot be 
reopened.")
+    }
     fos = new FileOutputStream(file, true)
     ts = new TimeTrackingOutputStream(fos)
     channel = fos.getChannel()
@@ -145,6 +152,7 @@ private[spark] class DiskBlockObjectWriter(
       ts = null
       objOut = null
       initialized = false
+      hasBeenClosed = true
     }
   }
 
@@ -168,6 +176,7 @@ private[spark] class DiskBlockObjectWriter(
   override def revertPartialWritesAndClose() {
     try {
       writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
+      writeMetrics.decShuffleRecordsWritten(numRecordsWritten)
 
       if (initialized) {
         objOut.flush()
@@ -193,12 +202,11 @@ private[spark] class DiskBlockObjectWriter(
     }
 
     objOut.writeObject(value)
+    numRecordsWritten += 1
+    writeMetrics.incShuffleRecordsWritten(1)
 
-    if (writesSinceMetricsUpdate == 32) {
-      writesSinceMetricsUpdate = 0
+    if (numRecordsWritten % 32 == 0) {
       updateBytesWritten()
-    } else {
-      writesSinceMetricsUpdate += 1
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala 
b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index 4307029..3a15e60 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -29,14 +29,15 @@ private[spark] object ToolTips {
   val SHUFFLE_READ_BLOCKED_TIME =
     "Time that the task spent blocked waiting for shuffle data to be read from 
remote machines."
 
-  val INPUT = "Bytes read from Hadoop or from Spark storage."
+  val INPUT = "Bytes and records read from Hadoop or from Spark storage."
 
-  val OUTPUT = "Bytes written to Hadoop."
+  val OUTPUT = "Bytes and records written to Hadoop."
 
-  val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle 
in a future stage."
+  val SHUFFLE_WRITE =
+    "Bytes and records written to disk in order to be read by a shuffle in a 
future stage."
 
   val SHUFFLE_READ =
-    """Bytes read from remote executors. Typically less than shuffle write 
bytes
+    """Bytes and records read from remote executors. Typically less than 
shuffle write bytes
        because this does not include shuffle data read locally."""
 
   val GETTING_RESULT_TIME =

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala 
b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index a38cb75..3afd7ef 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -48,7 +48,9 @@ class ExecutorsListener(storageStatusListener: 
StorageStatusListener) extends Sp
   val executorToTasksFailed = HashMap[String, Int]()
   val executorToDuration = HashMap[String, Long]()
   val executorToInputBytes = HashMap[String, Long]()
+  val executorToInputRecords = HashMap[String, Long]()
   val executorToOutputBytes = HashMap[String, Long]()
+  val executorToOutputRecords = HashMap[String, Long]()
   val executorToShuffleRead = HashMap[String, Long]()
   val executorToShuffleWrite = HashMap[String, Long]()
   val executorToLogUrls = HashMap[String, Map[String, String]]()
@@ -84,10 +86,14 @@ class ExecutorsListener(storageStatusListener: 
StorageStatusListener) extends Sp
         metrics.inputMetrics.foreach { inputMetrics =>
           executorToInputBytes(eid) =
             executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
+          executorToInputRecords(eid) =
+            executorToInputRecords.getOrElse(eid, 0L) + 
inputMetrics.recordsRead
         }
         metrics.outputMetrics.foreach { outputMetrics =>
           executorToOutputBytes(eid) =
             executorToOutputBytes.getOrElse(eid, 0L) + 
outputMetrics.bytesWritten
+          executorToOutputRecords(eid) =
+            executorToOutputRecords.getOrElse(eid, 0L) + 
outputMetrics.recordsWritten
         }
         metrics.shuffleReadMetrics.foreach { shuffleRead =>
           executorToShuffleRead(eid) =

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 9836d11..1f8536d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -36,6 +36,20 @@ private[ui] class ExecutorTable(stageId: Int, 
stageAttemptId: Int, parent: Stage
 
   /** Special table which merges two header cells. */
   private def executorTable[T](): Seq[Node] = {
+    val stageData = listener.stageIdToData.get((stageId, stageAttemptId))
+    var hasInput = false
+    var hasOutput = false
+    var hasShuffleWrite = false
+    var hasShuffleRead = false
+    var hasBytesSpilled = false
+    stageData.foreach(data => {
+        hasInput = data.hasInput
+        hasOutput = data.hasOutput
+        hasShuffleRead = data.hasShuffleRead
+        hasShuffleWrite = data.hasShuffleWrite
+        hasBytesSpilled = data.hasBytesSpilled
+    })
+
     <table class={UIUtils.TABLE_CLASS_STRIPED}>
       <thead>
         <th>Executor ID</th>
@@ -44,12 +58,32 @@ private[ui] class ExecutorTable(stageId: Int, 
stageAttemptId: Int, parent: Stage
         <th>Total Tasks</th>
         <th>Failed Tasks</th>
         <th>Succeeded Tasks</th>
-        <th><span data-toggle="tooltip" 
title={ToolTips.INPUT}>Input</span></th>
-        <th><span data-toggle="tooltip" 
title={ToolTips.OUTPUT}>Output</span></th>
-        <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle 
Read</span></th>
-        <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>Shuffle 
Write</span></th>
-        <th>Shuffle Spill (Memory)</th>
-        <th>Shuffle Spill (Disk)</th>
+        {if (hasInput) {
+          <th>
+            <span data-toggle="tooltip" title={ToolTips.INPUT}>Input Size / 
Records</span>
+          </th>
+        }}
+        {if (hasOutput) {
+          <th>
+            <span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output Size / 
Records</span>
+          </th>
+        }}
+        {if (hasShuffleRead) {
+          <th>
+            <span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>
+            Shuffle Read Size / Records</span>
+          </th>
+        }}
+        {if (hasShuffleWrite) {
+          <th>
+            <span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>
+            Shuffle Write Size / Records</span>
+          </th>
+        }}
+        {if (hasBytesSpilled) {
+          <th>Shuffle Spill (Memory)</th>
+          <th>Shuffle Spill (Disk)</th>
+        }}
       </thead>
       <tbody>
         {createExecutorTable()}
@@ -76,18 +110,34 @@ private[ui] class ExecutorTable(stageId: Int, 
stageAttemptId: Int, parent: Stage
             <td>{v.failedTasks + v.succeededTasks}</td>
             <td>{v.failedTasks}</td>
             <td>{v.succeededTasks}</td>
-            <td sorttable_customkey={v.inputBytes.toString}>
-              {Utils.bytesToString(v.inputBytes)}</td>
-            <td sorttable_customkey={v.outputBytes.toString}>
-              {Utils.bytesToString(v.outputBytes)}</td>
-            <td sorttable_customkey={v.shuffleRead.toString}>
-              {Utils.bytesToString(v.shuffleRead)}</td>
-            <td sorttable_customkey={v.shuffleWrite.toString}>
-              {Utils.bytesToString(v.shuffleWrite)}</td>
-            <td sorttable_customkey={v.memoryBytesSpilled.toString}>
-              {Utils.bytesToString(v.memoryBytesSpilled)}</td>
-            <td sorttable_customkey={v.diskBytesSpilled.toString}>
-              {Utils.bytesToString(v.diskBytesSpilled)}</td>
+            {if (stageData.hasInput) {
+              <td sorttable_customkey={v.inputBytes.toString}>
+                {s"${Utils.bytesToString(v.inputBytes)} / ${v.inputRecords}"}
+              </td>
+            }}
+            {if (stageData.hasOutput) {
+              <td sorttable_customkey={v.outputBytes.toString}>
+                {s"${Utils.bytesToString(v.outputBytes)} / ${v.outputRecords}"}
+              </td>
+            }}
+            {if (stageData.hasShuffleRead) {
+              <td sorttable_customkey={v.shuffleRead.toString}>
+                {s"${Utils.bytesToString(v.shuffleRead)} / 
${v.shuffleReadRecords}"}
+              </td>
+            }}
+            {if (stageData.hasShuffleWrite) {
+              <td sorttable_customkey={v.shuffleWrite.toString}>
+                {s"${Utils.bytesToString(v.shuffleWrite)} / 
${v.shuffleWriteRecords}"}
+              </td>
+            }}
+            {if (stageData.hasBytesSpilled) {
+              <td sorttable_customkey={v.memoryBytesSpilled.toString}>
+                {Utils.bytesToString(v.memoryBytesSpilled)}
+              </td>
+              <td sorttable_customkey={v.diskBytesSpilled.toString}>
+                {Utils.bytesToString(v.diskBytesSpilled)}
+              </td>
+            }}
           </tr>
         }
       case None =>

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 4d200ee..f463f8d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -394,24 +394,48 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
     stageData.shuffleWriteBytes += shuffleWriteDelta
     execSummary.shuffleWrite += shuffleWriteDelta
 
+    val shuffleWriteRecordsDelta =
+      
(taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L)
+      - 
oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleRecordsWritten).getOrElse(0L))
+    stageData.shuffleWriteRecords += shuffleWriteRecordsDelta
+    execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta
+
     val shuffleReadDelta =
       (taskMetrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L)
       - 
oldMetrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L))
     stageData.shuffleReadBytes += shuffleReadDelta
     execSummary.shuffleRead += shuffleReadDelta
 
+    val shuffleReadRecordsDelta =
+      (taskMetrics.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L)
+      - 
oldMetrics.flatMap(_.shuffleReadMetrics).map(_.recordsRead).getOrElse(0L))
+    stageData.shuffleReadRecords += shuffleReadRecordsDelta
+    execSummary.shuffleReadRecords += shuffleReadRecordsDelta
+
     val inputBytesDelta =
       (taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L)
       - oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L))
     stageData.inputBytes += inputBytesDelta
     execSummary.inputBytes += inputBytesDelta
 
+    val inputRecordsDelta =
+      (taskMetrics.inputMetrics.map(_.recordsRead).getOrElse(0L)
+      - oldMetrics.flatMap(_.inputMetrics).map(_.recordsRead).getOrElse(0L))
+    stageData.inputRecords += inputRecordsDelta
+    execSummary.inputRecords += inputRecordsDelta
+
     val outputBytesDelta =
       (taskMetrics.outputMetrics.map(_.bytesWritten).getOrElse(0L)
         - 
oldMetrics.flatMap(_.outputMetrics).map(_.bytesWritten).getOrElse(0L))
     stageData.outputBytes += outputBytesDelta
     execSummary.outputBytes += outputBytesDelta
 
+    val outputRecordsDelta =
+      (taskMetrics.outputMetrics.map(_.recordsWritten).getOrElse(0L)
+        - 
oldMetrics.flatMap(_.outputMetrics).map(_.recordsWritten).getOrElse(0L))
+    stageData.outputRecords += outputRecordsDelta
+    execSummary.outputRecords += outputRecordsDelta
+
     val diskSpillDelta =
       taskMetrics.diskBytesSpilled - 
oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
     stageData.diskBytesSpilled += diskSpillDelta

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index d8be1b2..02a3cc3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -56,11 +56,6 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
       val numCompleted = tasks.count(_.taskInfo.finished)
       val accumulables = listener.stageIdToData((stageId, 
stageAttemptId)).accumulables
       val hasAccumulators = accumulables.size > 0
-      val hasInput = stageData.inputBytes > 0
-      val hasOutput = stageData.outputBytes > 0
-      val hasShuffleRead = stageData.shuffleReadBytes > 0
-      val hasShuffleWrite = stageData.shuffleWriteBytes > 0
-      val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && 
stageData.diskBytesSpilled > 0
 
       val summary =
         <div>
@@ -69,31 +64,33 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
               <strong>Total task time across all tasks: </strong>
               {UIUtils.formatDuration(stageData.executorRunTime)}
             </li>
-            {if (hasInput) {
+            {if (stageData.hasInput) {
               <li>
-                <strong>Input: </strong>
-                {Utils.bytesToString(stageData.inputBytes)}
+                <strong>Input Size / Records: </strong>
+                {s"${Utils.bytesToString(stageData.inputBytes)} / 
${stageData.inputRecords}"}
               </li>
             }}
-            {if (hasOutput) {
+            {if (stageData.hasOutput) {
               <li>
                 <strong>Output: </strong>
-                {Utils.bytesToString(stageData.outputBytes)}
+                {s"${Utils.bytesToString(stageData.outputBytes)} / 
${stageData.outputRecords}"}
               </li>
             }}
-            {if (hasShuffleRead) {
+            {if (stageData.hasShuffleRead) {
               <li>
                 <strong>Shuffle read: </strong>
-                {Utils.bytesToString(stageData.shuffleReadBytes)}
+                {s"${Utils.bytesToString(stageData.shuffleReadBytes)} / " +
+                 s"${stageData.shuffleReadRecords}"}
               </li>
             }}
-            {if (hasShuffleWrite) {
+            {if (stageData.hasShuffleWrite) {
               <li>
                 <strong>Shuffle write: </strong>
-                {Utils.bytesToString(stageData.shuffleWriteBytes)}
+                 {s"${Utils.bytesToString(stageData.shuffleWriteBytes)} / " +
+                 s"${stageData.shuffleWriteRecords}"}
               </li>
             }}
-            {if (hasBytesSpilled) {
+            {if (stageData.hasBytesSpilled) {
               <li>
                 <strong>Shuffle spill (memory): </strong>
                 {Utils.bytesToString(stageData.memoryBytesSpilled)}
@@ -132,7 +129,7 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
                   <span class="additional-metric-title">Task Deserialization 
Time</span>
                 </span>
               </li>
-              {if (hasShuffleRead) {
+              {if (stageData.hasShuffleRead) {
                 <li>
                   <span data-toggle="tooltip"
                         title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} 
data-placement="right">
@@ -174,25 +171,32 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
           ("Result Serialization Time", 
TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
           ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) 
++
         {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
-        {if (hasInput) Seq(("Input", "")) else Nil} ++
-        {if (hasOutput) Seq(("Output", "")) else Nil} ++
-        {if (hasShuffleRead) {
+        {if (stageData.hasInput) Seq(("Input Size / Records", "")) else Nil} ++
+        {if (stageData.hasOutput) Seq(("Output Size / Records", "")) else Nil} 
++
+        {if (stageData.hasShuffleRead) {
           Seq(("Shuffle Read Blocked Time", 
TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
-            ("Shuffle Read", ""))
+            ("Shuffle Read Size / Records", ""))
+        } else {
+          Nil
+        }} ++
+        {if (stageData.hasShuffleWrite) {
+          Seq(("Write Time", ""), ("Shuffle Write Size / Records", ""))
+        } else {
+          Nil
+        }} ++
+        {if (stageData.hasBytesSpilled) {
+          Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
         } else {
           Nil
         }} ++
-        {if (hasShuffleWrite) Seq(("Write Time", ""), ("Shuffle Write", "")) 
else Nil} ++
-        {if (hasBytesSpilled) Seq(("Shuffle Spill (Memory)", ""), ("Shuffle 
Spill (Disk)", ""))
-          else Nil} ++
         Seq(("Errors", ""))
 
       val unzipped = taskHeadersAndCssClasses.unzip
 
       val taskTable = UIUtils.listingTable(
         unzipped._1,
-        taskRow(hasAccumulators, hasInput, hasOutput, hasShuffleRead, 
hasShuffleWrite,
-          hasBytesSpilled),
+        taskRow(hasAccumulators, stageData.hasInput, stageData.hasOutput,
+          stageData.hasShuffleRead, stageData.hasShuffleWrite, 
stageData.hasBytesSpilled),
         tasks,
         headerClasses = unzipped._2)
       // Excludes tasks which failed and have incomplete metrics
@@ -203,8 +207,11 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
           None
         }
         else {
+          def getDistributionQuantiles(data: Seq[Double]): IndexedSeq[Double] =
+            Distribution(data).get.getQuantiles()
+
           def getFormattedTimeQuantiles(times: Seq[Double]): Seq[Node] = {
-            Distribution(times).get.getQuantiles().map { millis =>
+            getDistributionQuantiles(times).map { millis =>
               <td>{UIUtils.formatDuration(millis.toLong)}</td>
             }
           }
@@ -273,17 +280,36 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
             getFormattedTimeQuantiles(schedulerDelays)
 
           def getFormattedSizeQuantiles(data: Seq[Double]) =
-            Distribution(data).get.getQuantiles().map(d => 
<td>{Utils.bytesToString(d.toLong)}</td>)
+            getDistributionQuantiles(data).map(d => 
<td>{Utils.bytesToString(d.toLong)}</td>)
+
+          def getFormattedSizeQuantilesWithRecords(data: Seq[Double], records: 
Seq[Double]) = {
+            val recordDist = getDistributionQuantiles(records).iterator
+            getDistributionQuantiles(data).map(d =>
+              <td>{s"${Utils.bytesToString(d.toLong)} / 
${recordDist.next().toLong}"}</td>
+            )
+          }
 
           val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
             metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
           }
-          val inputQuantiles = <td>Input</td> +: 
getFormattedSizeQuantiles(inputSizes)
+
+          val inputRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
+            metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble
+          }
+
+          val inputQuantiles = <td>Input Size / Records</td> +:
+            getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords)
 
           val outputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
             
metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
           }
-          val outputQuantiles = <td>Output</td> +: 
getFormattedSizeQuantiles(outputSizes)
+
+          val outputRecords = validTasks.map { case TaskUIData(_, metrics, _) 
=>
+            
metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
+          }
+
+          val outputQuantiles = <td>Output Size / Records</td> +:
+            getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords)
 
           val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, 
metrics, _) =>
             
metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
@@ -294,14 +320,24 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
           val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, 
_) =>
             
metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
           }
-          val shuffleReadQuantiles = <td>Shuffle Read (Remote)</td> +:
-            getFormattedSizeQuantiles(shuffleReadSizes)
+
+          val shuffleReadRecords = validTasks.map { case TaskUIData(_, 
metrics, _) =>
+            
metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
+          }
+
+          val shuffleReadQuantiles = <td>Shuffle Read Size / Records 
(Remote)</td> +:
+            getFormattedSizeQuantilesWithRecords(shuffleReadSizes, 
shuffleReadRecords)
 
           val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, 
_) =>
             
metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
           }
-          val shuffleWriteQuantiles = <td>Shuffle Write</td> +:
-            getFormattedSizeQuantiles(shuffleWriteSizes)
+
+          val shuffleWriteRecords = validTasks.map { case TaskUIData(_, 
metrics, _) =>
+            
metrics.get.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L).toDouble
+          }
+
+          val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +:
+            getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, 
shuffleWriteRecords)
 
           val memoryBytesSpilledSizes = validTasks.map { case TaskUIData(_, 
metrics, _) =>
             metrics.get.memoryBytesSpilled.toDouble
@@ -326,9 +362,9 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
               {serializationQuantiles}
             </tr>,
             <tr 
class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>,
-            if (hasInput) <tr>{inputQuantiles}</tr> else Nil,
-            if (hasOutput) <tr>{outputQuantiles}</tr> else Nil,
-            if (hasShuffleRead) {
+            if (stageData.hasInput) <tr>{inputQuantiles}</tr> else Nil,
+            if (stageData.hasOutput) <tr>{outputQuantiles}</tr> else Nil,
+            if (stageData.hasShuffleRead) {
               <tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
                 {shuffleReadBlockedQuantiles}
               </tr>
@@ -336,9 +372,9 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
             } else {
               Nil
             },
-            if (hasShuffleWrite) <tr>{shuffleWriteQuantiles}</tr> else Nil,
-            if (hasBytesSpilled) <tr>{memoryBytesSpilledQuantiles}</tr> else 
Nil,
-            if (hasBytesSpilled) <tr>{diskBytesSpilledQuantiles}</tr> else Nil)
+            if (stageData.hasShuffleWrite) <tr>{shuffleWriteQuantiles}</tr> 
else Nil,
+            if (stageData.hasBytesSpilled) 
<tr>{memoryBytesSpilledQuantiles}</tr> else Nil,
+            if (stageData.hasBytesSpilled) 
<tr>{diskBytesSpilledQuantiles}</tr> else Nil)
 
           val quantileHeaders = Seq("Metric", "Min", "25th percentile",
             "Median", "75th percentile", "Max")
@@ -397,26 +433,32 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
       val inputReadable = maybeInput
         .map(m => s"${Utils.bytesToString(m.bytesRead)} 
(${m.readMethod.toString.toLowerCase()})")
         .getOrElse("")
+      val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("")
 
       val maybeOutput = metrics.flatMap(_.outputMetrics)
       val outputSortable = 
maybeOutput.map(_.bytesWritten.toString).getOrElse("")
       val outputReadable = maybeOutput
         .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
         .getOrElse("")
+      val outputRecords = 
maybeOutput.map(_.recordsWritten.toString).getOrElse("")
 
-      val maybeShuffleReadBlockedTime = 
metrics.flatMap(_.shuffleReadMetrics).map(_.fetchWaitTime)
-      val shuffleReadBlockedTimeSortable = 
maybeShuffleReadBlockedTime.map(_.toString).getOrElse("")
+      val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics)
+      val shuffleReadBlockedTimeSortable = maybeShuffleRead
+        .map(_.fetchWaitTime.toString).getOrElse("")
       val shuffleReadBlockedTimeReadable =
-        maybeShuffleReadBlockedTime.map(ms => 
UIUtils.formatDuration(ms)).getOrElse("")
+        maybeShuffleRead.map(ms => 
UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
 
-      val maybeShuffleRead = 
metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead)
-      val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
-      val shuffleReadReadable = 
maybeShuffleRead.map(Utils.bytesToString).getOrElse("")
+      val shuffleReadSortable = 
maybeShuffleRead.map(_.remoteBytesRead.toString).getOrElse("")
+      val shuffleReadReadable = maybeShuffleRead
+        .map(m => s"${Utils.bytesToString(m.remoteBytesRead)}").getOrElse("")
+      val shuffleReadRecords = 
maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
 
-      val maybeShuffleWrite =
-        metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten)
-      val shuffleWriteSortable = 
maybeShuffleWrite.map(_.toString).getOrElse("")
-      val shuffleWriteReadable = 
maybeShuffleWrite.map(Utils.bytesToString).getOrElse("")
+      val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
+      val shuffleWriteSortable = 
maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("")
+      val shuffleWriteReadable = maybeShuffleWrite
+        .map(m => 
s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
+      val shuffleWriteRecords = maybeShuffleWrite
+        .map(_.shuffleRecordsWritten.toString).getOrElse("")
 
       val maybeWriteTime = 
metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
       val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
@@ -472,12 +514,12 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
         }}
         {if (hasInput) {
           <td sorttable_customkey={inputSortable}>
-            {inputReadable}
+            {s"$inputReadable / $inputRecords"}
           </td>
         }}
         {if (hasOutput) {
           <td sorttable_customkey={outputSortable}>
-            {outputReadable}
+            {s"$outputReadable / $outputRecords"}
           </td>
         }}
         {if (hasShuffleRead) {
@@ -486,7 +528,7 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
              {shuffleReadBlockedTimeReadable}
            </td>
            <td sorttable_customkey={shuffleReadSortable}>
-             {shuffleReadReadable}
+             {s"$shuffleReadReadable / $shuffleReadRecords"}
            </td>
         }}
         {if (hasShuffleWrite) {
@@ -494,7 +536,7 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
              {writeTimeReadable}
            </td>
            <td sorttable_customkey={shuffleWriteSortable}>
-             {shuffleWriteReadable}
+             {s"$shuffleWriteReadable / $shuffleWriteRecords"}
            </td>
         }}
         {if (hasBytesSpilled) {

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 01f7e23..69aac6c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -31,9 +31,13 @@ private[jobs] object UIData {
     var failedTasks : Int = 0
     var succeededTasks : Int = 0
     var inputBytes : Long = 0
+    var inputRecords : Long = 0
     var outputBytes : Long = 0
+    var outputRecords : Long = 0
     var shuffleRead : Long = 0
+    var shuffleReadRecords : Long = 0
     var shuffleWrite : Long = 0
+    var shuffleWriteRecords : Long = 0
     var memoryBytesSpilled : Long = 0
     var diskBytesSpilled : Long = 0
   }
@@ -73,9 +77,13 @@ private[jobs] object UIData {
     var executorRunTime: Long = _
 
     var inputBytes: Long = _
+    var inputRecords: Long = _
     var outputBytes: Long = _
+    var outputRecords: Long = _
     var shuffleReadBytes: Long = _
+    var shuffleReadRecords : Long = _
     var shuffleWriteBytes: Long = _
+    var shuffleWriteRecords: Long = _
     var memoryBytesSpilled: Long = _
     var diskBytesSpilled: Long = _
 
@@ -85,6 +93,12 @@ private[jobs] object UIData {
     var accumulables = new HashMap[Long, AccumulableInfo]
     var taskData = new HashMap[Long, TaskUIData]
     var executorSummary = new HashMap[String, ExecutorSummary]
+
+    def hasInput = inputBytes > 0
+    def hasOutput = outputBytes > 0
+    def hasShuffleRead = shuffleReadBytes > 0
+    def hasShuffleWrite = shuffleWriteBytes > 0
+    def hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c8407bb..b0b5456 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -293,22 +293,26 @@ private[spark] object JsonProtocol {
     ("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
     ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
     ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
-    ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead)
+    ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~
+    ("Total Records Read" -> shuffleReadMetrics.recordsRead)
   }
 
   def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): 
JValue = {
     ("Shuffle Bytes Written" -> shuffleWriteMetrics.shuffleBytesWritten) ~
-    ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime)
+    ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) ~
+    ("Shuffle Records Written" -> shuffleWriteMetrics.shuffleRecordsWritten)
   }
 
   def inputMetricsToJson(inputMetrics: InputMetrics): JValue = {
     ("Data Read Method" -> inputMetrics.readMethod.toString) ~
-    ("Bytes Read" -> inputMetrics.bytesRead)
+    ("Bytes Read" -> inputMetrics.bytesRead) ~
+    ("Records Read" -> inputMetrics.recordsRead)
   }
 
   def outputMetricsToJson(outputMetrics: OutputMetrics): JValue = {
     ("Data Write Method" -> outputMetrics.writeMethod.toString) ~
-    ("Bytes Written" -> outputMetrics.bytesWritten)
+    ("Bytes Written" -> outputMetrics.bytesWritten) ~
+    ("Records Written" -> outputMetrics.recordsWritten)
   }
 
   def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
@@ -670,6 +674,7 @@ private[spark] object JsonProtocol {
     metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
     metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
     metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
+    metrics.incRecordsRead((json \ "Total Records 
Read").extractOpt[Long].getOrElse(0))
     metrics
   }
 
@@ -677,13 +682,16 @@ private[spark] object JsonProtocol {
     val metrics = new ShuffleWriteMetrics
     metrics.incShuffleBytesWritten((json \ "Shuffle Bytes 
Written").extract[Long])
     metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long])
+    metrics.setShuffleRecordsWritten((json \ "Shuffle Records Written")
+      .extractOpt[Long].getOrElse(0))
     metrics
   }
 
   def inputMetricsFromJson(json: JValue): InputMetrics = {
     val metrics = new InputMetrics(
       DataReadMethod.withName((json \ "Data Read Method").extract[String]))
-    metrics.addBytesRead((json \ "Bytes Read").extract[Long])
+    metrics.incBytesRead((json \ "Bytes Read").extract[Long])
+    metrics.incRecordsRead((json \ "Records 
Read").extractOpt[Long].getOrElse(0))
     metrics
   }
 
@@ -691,6 +699,7 @@ private[spark] object JsonProtocol {
     val metrics = new OutputMetrics(
       DataWriteMethod.withName((json \ "Data Write Method").extract[String]))
     metrics.setBytesWritten((json \ "Bytes Written").extract[Long])
+    metrics.setRecordsWritten((json \ "Records 
Written").extractOpt[Long].getOrElse(0))
     metrics
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 6ba0384..eaec5a7 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -763,6 +763,7 @@ private[spark] class ExternalSorter[K, V, C](
       if (curWriteMetrics != null) {
         m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten)
         m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime)
+        m.incShuffleRecordsWritten(curWriteMetrics.shuffleRecordsWritten)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala 
b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index 81db66a..78fa98a 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -21,44 +21,46 @@ import java.io.{File, FileWriter, PrintWriter}
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.scalatest.FunSuite
-
+import org.apache.commons.lang.math.RandomUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.{LongWritable, Text}
-import org.apache.hadoop.mapred.{FileSplit => OldFileSplit, InputSplit => 
OldInputSplit, JobConf,
-  LineRecordReader => OldLineRecordReader, RecordReader => OldRecordReader, 
Reporter,
-  TextInputFormat => OldTextInputFormat}
 import org.apache.hadoop.mapred.lib.{CombineFileInputFormat => 
OldCombineFileInputFormat,
-  CombineFileSplit => OldCombineFileSplit, CombineFileRecordReader => 
OldCombineFileRecordReader}
-import org.apache.hadoop.mapreduce.{InputSplit => NewInputSplit, RecordReader 
=> NewRecordReader,
-  TaskAttemptContext}
+  CombineFileRecordReader => OldCombineFileRecordReader, CombineFileSplit => 
OldCombineFileSplit}
+import org.apache.hadoop.mapred.{JobConf, Reporter, FileSplit => OldFileSplit,
+  InputSplit => OldInputSplit, LineRecordReader => OldLineRecordReader,
+  RecordReader => OldRecordReader, TextInputFormat => OldTextInputFormat}
 import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat => 
NewCombineFileInputFormat,
   CombineFileRecordReader => NewCombineFileRecordReader, CombineFileSplit => 
NewCombineFileSplit,
   FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => 
NewTextOutputFormat}
+import org.apache.hadoop.mapreduce.{TaskAttemptContext, InputSplit => 
NewInputSplit,
+  RecordReader => NewRecordReader}
+import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark.SharedSparkContext
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.util.Utils
 
-class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
+class InputOutputMetricsSuite extends FunSuite with SharedSparkContext
+  with BeforeAndAfter {
 
   @transient var tmpDir: File = _
   @transient var tmpFile: File = _
   @transient var tmpFilePath: String = _
+  @transient val numRecords: Int = 100000
+  @transient val numBuckets: Int = 10
 
-  override def beforeAll() {
-    super.beforeAll()
-
+  before {
     tmpDir = Utils.createTempDir()
     val testTempDir = new File(tmpDir, "test")
     testTempDir.mkdir()
 
     tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt")
     val pw = new PrintWriter(new FileWriter(tmpFile))
-    for (x <- 1 to 1000000) {
-      pw.println("s")
+    for (x <- 1 to numRecords) {
+      pw.println(RandomUtils.nextInt(numBuckets))
     }
     pw.close()
 
@@ -66,8 +68,7 @@ class InputOutputMetricsSuite extends FunSuite with 
SharedSparkContext {
     tmpFilePath = "file://" + tmpFile.getAbsolutePath
   }
 
-  override def afterAll() {
-    super.afterAll()
+  after {
     Utils.deleteRecursively(tmpDir)
   }
 
@@ -155,6 +156,101 @@ class InputOutputMetricsSuite extends FunSuite with 
SharedSparkContext {
     assert(bytesRead >= tmpFile.length())
   }
 
+  test("input metrics on records read - simple") {
+    val records = runAndReturnRecordsRead {
+      sc.textFile(tmpFilePath, 4).count()
+    }
+    assert(records == numRecords)
+  }
+
+  test("input metrics on records read - more stages") {
+    val records = runAndReturnRecordsRead {
+      sc.textFile(tmpFilePath, 4)
+        .map(key => (key.length, 1))
+        .reduceByKey(_ + _)
+        .count()
+    }
+    assert(records == numRecords)
+  }
+
+  test("input metrics on records - New Hadoop API") {
+    val records = runAndReturnRecordsRead {
+      sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], 
classOf[LongWritable],
+        classOf[Text]).count()
+    }
+    assert(records == numRecords)
+  }
+
+  test("input metrics on recordsd read with cache") {
+    // prime the cache manager
+    val rdd = sc.textFile(tmpFilePath, 4).cache()
+    rdd.collect()
+
+    val records = runAndReturnRecordsRead {
+      rdd.count()
+    }
+
+    assert(records == numRecords)
+  }
+
+  test("shuffle records read metrics") {
+    val recordsRead = runAndReturnShuffleRecordsRead {
+      sc.textFile(tmpFilePath, 4)
+        .map(key => (key, 1))
+        .groupByKey()
+        .collect()
+    }
+    assert(recordsRead == numRecords)
+  }
+
+  test("shuffle records written metrics") {
+    val recordsWritten = runAndReturnShuffleRecordsWritten {
+      sc.textFile(tmpFilePath, 4)
+        .map(key => (key, 1))
+        .groupByKey()
+        .collect()
+    }
+    assert(recordsWritten == numRecords)
+  }
+
+  /**
+   * Tests the metrics from end to end.
+   * 1) reading a hadoop file
+   * 2) shuffle and writing to a hadoop file.
+   * 3) writing to hadoop file.
+   */
+  test("input read/write and shuffle read/write metrics all line up") {
+    var inputRead = 0L
+    var outputWritten = 0L
+    var shuffleRead = 0L
+    var shuffleWritten = 0L
+    sc.addSparkListener(new SparkListener() {
+      override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+        val metrics = taskEnd.taskMetrics
+        metrics.inputMetrics.foreach(inputRead += _.recordsRead)
+        metrics.outputMetrics.foreach(outputWritten += _.recordsWritten)
+        metrics.shuffleReadMetrics.foreach(shuffleRead += _.recordsRead)
+        metrics.shuffleWriteMetrics.foreach(shuffleWritten += 
_.shuffleRecordsWritten)
+      }
+    })
+
+    val tmpFile = new File(tmpDir, getClass.getSimpleName)
+
+    sc.textFile(tmpFilePath, 4)
+      .map(key => (key, 1))
+      .reduceByKey(_+_)
+      .saveAsTextFile("file://" + tmpFile.getAbsolutePath)
+
+    sc.listenerBus.waitUntilEmpty(500)
+    assert(inputRead == numRecords)
+
+    // Only supported on newer Hadoop
+    if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
+      assert(outputWritten == numBuckets)
+    }
+    assert(shuffleRead == shuffleWritten)
+  }
+
   test("input metrics with interleaved reads") {
     val numPartitions = 2
     val cartVector = 0 to 9
@@ -193,18 +289,66 @@ class InputOutputMetricsSuite extends FunSuite with 
SharedSparkContext {
     assert(cartesianBytes == firstSize * numPartitions + (cartVector.length  * 
secondSize))
   }
 
-  private def runAndReturnBytesRead(job : => Unit): Long = {
-    val taskBytesRead = new ArrayBuffer[Long]()
+  private def runAndReturnBytesRead(job: => Unit): Long = {
+    runAndReturnMetrics(job, _.taskMetrics.inputMetrics.map(_.bytesRead))
+  }
+
+  private def runAndReturnRecordsRead(job: => Unit): Long = {
+    runAndReturnMetrics(job, _.taskMetrics.inputMetrics.map(_.recordsRead))
+  }
+
+  private def runAndReturnRecordsWritten(job: => Unit): Long = {
+    runAndReturnMetrics(job, _.taskMetrics.outputMetrics.map(_.recordsWritten))
+  }
+
+  private def runAndReturnShuffleRecordsRead(job: => Unit): Long = {
+    runAndReturnMetrics(job, 
_.taskMetrics.shuffleReadMetrics.map(_.recordsRead))
+  }
+
+  private def runAndReturnShuffleRecordsWritten(job: => Unit): Long = {
+    runAndReturnMetrics(job, 
_.taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten))
+  }
+
+  private def runAndReturnMetrics(job: => Unit,
+      collector: (SparkListenerTaskEnd) => Option[Long]): Long = {
+    val taskMetrics = new ArrayBuffer[Long]()
     sc.addSparkListener(new SparkListener() {
       override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
-        taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
+        collector(taskEnd).foreach(taskMetrics += _)
       }
     })
 
     job
 
     sc.listenerBus.waitUntilEmpty(500)
-    taskBytesRead.sum
+    taskMetrics.sum
+  }
+
+  test("output metrics on records written") {
+    // Only supported on newer Hadoop
+    if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
+      val file = new File(tmpDir, getClass.getSimpleName)
+      val filePath = "file://" + file.getAbsolutePath
+
+      val records = runAndReturnRecordsWritten {
+        sc.parallelize(1 to numRecords).saveAsTextFile(filePath)
+      }
+      assert(records == numRecords)
+    }
+  }
+
+  test("output metrics on records written - new Hadoop API") {
+    // Only supported on newer Hadoop
+    if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
+      val file = new File(tmpDir, getClass.getSimpleName)
+      val filePath = "file://" + file.getAbsolutePath
+
+      val records = runAndReturnRecordsWritten {
+        sc.parallelize(1 to numRecords).map(key => (key.toString, 
key.toString))
+          .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, 
String]](filePath)
+      }
+      assert(records == numRecords)
+    }
   }
 
   test("output metrics when writing text file") {
@@ -318,4 +462,4 @@ class NewCombineTextRecordReaderWrapper(
   override def getCurrentValue(): Text = delegate.getCurrentValue
   override def getProgress(): Float = delegate.getProgress
   override def close(): Unit = delegate.close()
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
index bbc7e13..c21c92b 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
@@ -31,6 +31,8 @@ class BlockObjectWriterSuite extends FunSuite {
       new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
 
     writer.write(Long.box(20))
+    // Record metrics update on every write
+    assert(writeMetrics.shuffleRecordsWritten === 1)
     // Metrics don't update on every write
     assert(writeMetrics.shuffleBytesWritten == 0)
     // After 32 writes, metrics should update
@@ -39,6 +41,7 @@ class BlockObjectWriterSuite extends FunSuite {
       writer.write(Long.box(i))
     }
     assert(writeMetrics.shuffleBytesWritten > 0)
+    assert(writeMetrics.shuffleRecordsWritten === 33)
     writer.commitAndClose()
     assert(file.length() == writeMetrics.shuffleBytesWritten)
   }
@@ -51,6 +54,8 @@ class BlockObjectWriterSuite extends FunSuite {
       new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
 
     writer.write(Long.box(20))
+    // Record metrics update on every write
+    assert(writeMetrics.shuffleRecordsWritten === 1)
     // Metrics don't update on every write
     assert(writeMetrics.shuffleBytesWritten == 0)
     // After 32 writes, metrics should update
@@ -59,7 +64,23 @@ class BlockObjectWriterSuite extends FunSuite {
       writer.write(Long.box(i))
     }
     assert(writeMetrics.shuffleBytesWritten > 0)
+    assert(writeMetrics.shuffleRecordsWritten === 33)
     writer.revertPartialWritesAndClose()
     assert(writeMetrics.shuffleBytesWritten == 0)
+    assert(writeMetrics.shuffleRecordsWritten == 0)
+  }
+
+  test("Reopening a closed block writer") {
+    val file = new File("somefile")
+    file.deleteOnExit()
+    val writeMetrics = new ShuffleWriteMetrics()
+    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
+      new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
+
+    writer.open()
+    writer.close()
+    intercept[IllegalStateException] {
+      writer.open()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 68074ae..e8405ba 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -234,7 +234,7 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
       taskMetrics.incMemoryBytesSpilled(base + 6)
       val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
       taskMetrics.setInputMetrics(Some(inputMetrics))
-      inputMetrics.addBytesRead(base + 7)
+      inputMetrics.incBytesRead(base + 7)
       val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
       taskMetrics.outputMetrics = Some(outputMetrics)
       outputMetrics.setBytesWritten(base + 8)

http://git-wip-us.apache.org/repos/asf/spark/blob/dcd1e42d/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 842f545..f3017dc 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -189,6 +189,34 @@ class JsonProtocolSuite extends FunSuite {
     assert(newMetrics.inputMetrics.isEmpty)
   }
 
+  test("Input/Output records backwards compatibility") {
+    // records read were added after 1.2
+    val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
+      hasHadoopInput = true, hasOutput = true, hasRecords = false)
+    assert(metrics.inputMetrics.nonEmpty)
+    assert(metrics.outputMetrics.nonEmpty)
+    val newJson = JsonProtocol.taskMetricsToJson(metrics)
+    val oldJson = newJson.removeField { case (field, _) => field == "Records 
Read" }
+                         .removeField { case (field, _) => field == "Records 
Written" }
+    val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+    assert(newMetrics.inputMetrics.get.recordsRead == 0)
+    assert(newMetrics.outputMetrics.get.recordsWritten == 0)
+  }
+
+  test("Shuffle Read/Write records backwards compatibility") {
+    // records read were added after 1.2
+    val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
+      hasHadoopInput = false, hasOutput = false, hasRecords = false)
+    assert(metrics.shuffleReadMetrics.nonEmpty)
+    assert(metrics.shuffleWriteMetrics.nonEmpty)
+    val newJson = JsonProtocol.taskMetricsToJson(metrics)
+    val oldJson = newJson.removeField { case (field, _) => field == "Total 
Records Read" }
+                         .removeField { case (field, _) => field == "Shuffle 
Records Written" }
+    val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+    assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0)
+    assert(newMetrics.shuffleWriteMetrics.get.shuffleRecordsWritten == 0)
+  }
+
   test("OutputMetrics backward compatibility") {
     // OutputMetrics were added after 1.1
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = 
false, hasOutput = true)
@@ -644,7 +672,8 @@ class JsonProtocolSuite extends FunSuite {
       e: Int,
       f: Int,
       hasHadoopInput: Boolean,
-      hasOutput: Boolean) = {
+      hasOutput: Boolean,
+      hasRecords: Boolean = true) = {
     val t = new TaskMetrics
     t.setHostname("localhost")
     t.setExecutorDeserializeTime(a)
@@ -656,7 +685,8 @@ class JsonProtocolSuite extends FunSuite {
 
     if (hasHadoopInput) {
       val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
-      inputMetrics.addBytesRead(d + e + f)
+      inputMetrics.incBytesRead(d + e + f)
+      inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
       t.setInputMetrics(Some(inputMetrics))
     } else {
       val sr = new ShuffleReadMetrics
@@ -664,16 +694,19 @@ class JsonProtocolSuite extends FunSuite {
       sr.incLocalBlocksFetched(e)
       sr.incFetchWaitTime(a + d)
       sr.incRemoteBlocksFetched(f)
+      sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1)
       t.setShuffleReadMetrics(Some(sr))
     }
     if (hasOutput) {
       val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
       outputMetrics.setBytesWritten(a + b + c)
+      outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c)/100 else -1)
       t.outputMetrics = Some(outputMetrics)
     } else {
       val sw = new ShuffleWriteMetrics
       sw.incShuffleBytesWritten(a + b + c)
       sw.incShuffleWriteTime(b + c + d)
+      sw.setShuffleRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
       t.shuffleWriteMetrics = Some(sw)
     }
     // Make at most 6 blocks
@@ -907,11 +940,13 @@ class JsonProtocolSuite extends FunSuite {
       |      "Remote Blocks Fetched": 800,
       |      "Local Blocks Fetched": 700,
       |      "Fetch Wait Time": 900,
-      |      "Remote Bytes Read": 1000
+      |      "Remote Bytes Read": 1000,
+      |      "Total Records Read" : 10
       |    },
       |    "Shuffle Write Metrics": {
       |      "Shuffle Bytes Written": 1200,
-      |      "Shuffle Write Time": 1500
+      |      "Shuffle Write Time": 1500,
+      |      "Shuffle Records Written": 12
       |    },
       |    "Updated Blocks": [
       |      {
@@ -988,11 +1023,13 @@ class JsonProtocolSuite extends FunSuite {
       |    "Disk Bytes Spilled": 0,
       |    "Shuffle Write Metrics": {
       |      "Shuffle Bytes Written": 1200,
-      |      "Shuffle Write Time": 1500
+      |      "Shuffle Write Time": 1500,
+      |      "Shuffle Records Written": 12
       |    },
       |    "Input Metrics": {
       |      "Data Read Method": "Hadoop",
-      |      "Bytes Read": 2100
+      |      "Bytes Read": 2100,
+      |      "Records Read": 21
       |    },
       |    "Updated Blocks": [
       |      {
@@ -1069,11 +1106,13 @@ class JsonProtocolSuite extends FunSuite {
       |    "Disk Bytes Spilled": 0,
       |    "Input Metrics": {
       |      "Data Read Method": "Hadoop",
-      |      "Bytes Read": 2100
+      |      "Bytes Read": 2100,
+      |      "Records Read": 21
       |    },
       |    "Output Metrics": {
       |      "Data Write Method": "Hadoop",
-      |      "Bytes Written": 1200
+      |      "Bytes Written": 1200,
+      |      "Records Written": 12
       |    },
       |    "Updated Blocks": [
       |      {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to