Repository: spark
Updated Branches:
  refs/heads/master c4830598b -> 1f6b0b123


[SPARK-8701] [STREAMING] [WEBUI] Add input metadata in the batch page

This PR adds `metadata` to `InputInfo`. `InputDStream` can report its metadata 
for a batch and it will be shown in the batch page.

For example,

![screen 
shot](https://cloud.githubusercontent.com/assets/1000778/8403741/d6ffc7e2-1e79-11e5-9888-c78c1575123a.png)

FileInputDStream will display the new files for a batch, and 
DirectKafkaInputDStream will display its offset ranges.

Author: zsxwing <zsxw...@gmail.com>

Closes #7081 from zsxwing/input-metadata and squashes the following commits:

f7abd9b [zsxwing] Revert the space changes in project/MimaExcludes.scala
d906209 [zsxwing] Merge branch 'master' into input-metadata
74762da [zsxwing] Fix MiMa tests
7903e33 [zsxwing] Merge branch 'master' into input-metadata
450a46c [zsxwing] Address comments
1d94582 [zsxwing] Raname InputInfo to StreamInputInfo and change "metadata" to 
Map[String, Any]
d496ae9 [zsxwing] Add input metadata in the batch page


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f6b0b12
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f6b0b12
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f6b0b12

Branch: refs/heads/master
Commit: 1f6b0b1234cc03aa2e07aea7fec2de7563885238
Parents: c483059
Author: zsxwing <zsxw...@gmail.com>
Authored: Thu Jul 9 13:48:29 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Jul 9 13:48:29 2015 -0700

----------------------------------------------------------------------
 .../kafka/DirectKafkaInputDStream.scala         | 23 +++++++++--
 .../spark/streaming/kafka/OffsetRange.scala     |  2 +-
 project/MimaExcludes.scala                      |  6 +++
 .../streaming/dstream/FileInputDStream.scala    | 10 ++++-
 .../dstream/ReceiverInputDStream.scala          |  4 +-
 .../spark/streaming/scheduler/BatchInfo.scala   |  9 ++--
 .../streaming/scheduler/InputInfoTracker.scala  | 38 +++++++++++++----
 .../streaming/scheduler/JobGenerator.scala      |  3 +-
 .../spark/streaming/scheduler/JobSet.scala      |  4 +-
 .../apache/spark/streaming/ui/BatchPage.scala   | 43 ++++++++++++++++++--
 .../apache/spark/streaming/ui/BatchUIData.scala |  8 ++--
 .../ui/StreamingJobProgressListener.scala       |  5 ++-
 .../streaming/StreamingListenerSuite.scala      |  6 +--
 .../apache/spark/streaming/TestSuiteBase.scala  |  2 +-
 .../scheduler/InputInfoTrackerSuite.scala       |  8 ++--
 .../ui/StreamingJobProgressListenerSuite.scala  | 28 +++++++------
 16 files changed, 148 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 876456c..48a1933 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.kafka
 
 import scala.annotation.tailrec
 import scala.collection.mutable
-import scala.reflect.{classTag, ClassTag}
+import scala.reflect.ClassTag
 
 import kafka.common.TopicAndPartition
 import kafka.message.MessageAndMetadata
@@ -29,7 +29,7 @@ import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.streaming.{StreamingContext, Time}
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
-import org.apache.spark.streaming.scheduler.InputInfo
+import org.apache.spark.streaming.scheduler.StreamInputInfo
 
 /**
  *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
@@ -119,8 +119,23 @@ class DirectKafkaInputDStream[
     val rdd = KafkaRDD[K, V, U, T, R](
       context.sparkContext, kafkaParams, currentOffsets, untilOffsets, 
messageHandler)
 
-    // Report the record number of this batch interval to InputInfoTracker.
-    val inputInfo = InputInfo(id, rdd.count)
+    // Report the record number and metadata of this batch interval to 
InputInfoTracker.
+    val offsetRanges = currentOffsets.map { case (tp, fo) =>
+      val uo = untilOffsets(tp)
+      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
+    }
+    val description = offsetRanges.filter { offsetRange =>
+      // Don't display empty ranges.
+      offsetRange.fromOffset != offsetRange.untilOffset
+    }.map { offsetRange =>
+      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
+        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+    }.mkString("\n")
+    // Copy offsetRanges to immutable.List to prevent from being modified by 
the user
+    val metadata = Map(
+      "offsets" -> offsetRanges.toList,
+      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
     ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
 
     currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
index 2675042..f326e7f 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
@@ -75,7 +75,7 @@ final class OffsetRange private(
   }
 
   override def toString(): String = {
-    s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset 
-> $untilOffset]"
+    s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset 
-> $untilOffset])"
   }
 
   /** this is to avoid ClassNotFoundException during checkpoint restore */

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 821aadd..79089aa 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -77,6 +77,12 @@ object MimaExcludes {
             // SPARK-8914 Remove RDDApi
             ProblemFilters.exclude[MissingClassProblem](
             "org.apache.spark.sql.RDDApi")
+          ) ++ Seq(
+            // SPARK-8701 Add input metadata in the batch page.
+            ProblemFilters.exclude[MissingClassProblem](
+              "org.apache.spark.streaming.scheduler.InputInfo$"),
+            ProblemFilters.exclude[MissingClassProblem](
+              "org.apache.spark.streaming.scheduler.InputInfo")
           )
 
         case v if v.startsWith("1.4") =>

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 86a8e2b..dd4da9d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => 
NewInputFormat}
 
 import org.apache.spark.rdd.{RDD, UnionRDD}
 import org.apache.spark.streaming._
+import org.apache.spark.streaming.scheduler.StreamInputInfo
 import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, 
Utils}
 
 /**
@@ -144,7 +145,14 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
     logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
     batchTimeToSelectedFiles += ((validTime, newFiles))
     recentlySelectedFiles ++= newFiles
-    Some(filesToRDD(newFiles))
+    val rdds = Some(filesToRDD(newFiles))
+    // Copy newFiles to immutable.List to prevent from being modified by the 
user
+    val metadata = Map(
+      "files" -> newFiles.toList,
+      StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
+    val inputInfo = StreamInputInfo(id, 0, metadata)
+    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+    rdds
   }
 
   /** Clear the old time-to-files mappings along with old RDDs */

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index e76e7eb..a50f0ef 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -24,7 +24,7 @@ import org.apache.spark.storage.BlockId
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
 import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.InputInfo
+import org.apache.spark.streaming.scheduler.StreamInputInfo
 import org.apache.spark.streaming.util.WriteAheadLogUtils
 
 /**
@@ -70,7 +70,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient 
ssc_ : StreamingCont
         val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] 
}.toArray
 
         // Register the input blocks information into InputInfoTracker
-        val inputInfo = InputInfo(id, blockInfos.flatMap(_.numRecords).sum)
+        val inputInfo = StreamInputInfo(id, 
blockInfos.flatMap(_.numRecords).sum)
         ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
 
         if (blockInfos.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 5b9bfbf..9922b6b 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -24,7 +24,7 @@ import org.apache.spark.streaming.Time
  * :: DeveloperApi ::
  * Class having information on completed batches.
  * @param batchTime   Time of the batch
- * @param streamIdToNumRecords A map of input stream id to record number
+ * @param streamIdToInputInfo A map of input stream id to its input info
  * @param submissionTime  Clock time of when jobs of this batch was submitted 
to
  *                        the streaming scheduler queue
  * @param processingStartTime Clock time of when the first job of this batch 
started processing
@@ -33,12 +33,15 @@ import org.apache.spark.streaming.Time
 @DeveloperApi
 case class BatchInfo(
     batchTime: Time,
-    streamIdToNumRecords: Map[Int, Long],
+    streamIdToInputInfo: Map[Int, StreamInputInfo],
     submissionTime: Long,
     processingStartTime: Option[Long],
     processingEndTime: Option[Long]
   ) {
 
+  @deprecated("Use streamIdToInputInfo instead", "1.5.0")
+  def streamIdToNumRecords: Map[Int, Long] = 
streamIdToInputInfo.mapValues(_.numRecords)
+
   /**
    * Time taken for the first job of this batch to start processing from the 
time this batch
    * was submitted to the streaming scheduler. Essentially, it is
@@ -63,5 +66,5 @@ case class BatchInfo(
   /**
    * The number of recorders received by the receivers in this batch.
    */
-  def numRecords: Long = streamIdToNumRecords.values.sum
+  def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
index 7c0db8a..363c03d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
@@ -20,11 +20,34 @@ package org.apache.spark.streaming.scheduler
 import scala.collection.mutable
 
 import org.apache.spark.Logging
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.streaming.{Time, StreamingContext}
 
-/** To track the information of input stream at specified batch time. */
-private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) {
+/**
+ * :: DeveloperApi ::
+ * Track the information of input stream at specified batch time.
+ *
+ * @param inputStreamId the input stream id
+ * @param numRecords the number of records in a batch
+ * @param metadata metadata for this batch. It should contain at least one 
standard field named
+ *                 "Description" which maps to the content that will be shown 
in the UI.
+ */
+@DeveloperApi
+case class StreamInputInfo(
+    inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = 
Map.empty) {
   require(numRecords >= 0, "numRecords must not be negative")
+
+  def metadataDescription: Option[String] =
+    metadata.get(StreamInputInfo.METADATA_KEY_DESCRIPTION).map(_.toString)
+}
+
+@DeveloperApi
+object StreamInputInfo {
+
+  /**
+   * The key for description in `StreamInputInfo.metadata`.
+   */
+  val METADATA_KEY_DESCRIPTION: String = "Description"
 }
 
 /**
@@ -34,12 +57,13 @@ private[streaming] case class InputInfo(inputStreamId: Int, 
numRecords: Long) {
 private[streaming] class InputInfoTracker(ssc: StreamingContext) extends 
Logging {
 
   // Map to track all the InputInfo related to specific batch time and input 
stream.
-  private val batchTimeToInputInfos = new mutable.HashMap[Time, 
mutable.HashMap[Int, InputInfo]]
+  private val batchTimeToInputInfos =
+    new mutable.HashMap[Time, mutable.HashMap[Int, StreamInputInfo]]
 
   /** Report the input information with batch time to the tracker */
-  def reportInfo(batchTime: Time, inputInfo: InputInfo): Unit = synchronized {
+  def reportInfo(batchTime: Time, inputInfo: StreamInputInfo): Unit = 
synchronized {
     val inputInfos = batchTimeToInputInfos.getOrElseUpdate(batchTime,
-      new mutable.HashMap[Int, InputInfo]())
+      new mutable.HashMap[Int, StreamInputInfo]())
 
     if (inputInfos.contains(inputInfo.inputStreamId)) {
       throw new IllegalStateException(s"Input stream 
${inputInfo.inputStreamId}} for batch" +
@@ -49,10 +73,10 @@ private[streaming] class InputInfoTracker(ssc: 
StreamingContext) extends Logging
   }
 
   /** Get the all the input stream's information of specified batch time */
-  def getInfo(batchTime: Time): Map[Int, InputInfo] = synchronized {
+  def getInfo(batchTime: Time): Map[Int, StreamInputInfo] = synchronized {
     val inputInfos = batchTimeToInputInfos.get(batchTime)
     // Convert mutable HashMap to immutable Map for the caller
-    inputInfos.map(_.toMap).getOrElse(Map[Int, InputInfo]())
+    inputInfos.map(_.toMap).getOrElse(Map[Int, StreamInputInfo]())
   }
 
   /** Cleanup the tracked input information older than threshold batch time */

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 9f93d6c..f5d4185 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -244,8 +244,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
     } match {
       case Success(jobs) =>
         val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
-        val streamIdToNumRecords = streamIdToInputInfos.mapValues(_.numRecords)
-        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToNumRecords))
+        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
       case Failure(e) =>
         jobScheduler.reportError("Error generating jobs for time " + time, e)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index e6be63b..95833ef 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -28,7 +28,7 @@ private[streaming]
 case class JobSet(
     time: Time,
     jobs: Seq[Job],
-    streamIdToNumRecords: Map[Int, Long] = Map.empty) {
+    streamIdToInputInfo: Map[Int, StreamInputInfo] = Map.empty) {
 
   private val incompleteJobs = new HashSet[Job]()
   private val submissionTime = System.currentTimeMillis() // when this jobset 
was submitted
@@ -64,7 +64,7 @@ case class JobSet(
   def toBatchInfo: BatchInfo = {
     new BatchInfo(
       time,
-      streamIdToNumRecords,
+      streamIdToInputInfo,
       submissionTime,
       if (processingStartTime >= 0 ) Some(processingStartTime) else None,
       if (processingEndTime >= 0 ) Some(processingEndTime) else None

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index f750676..0c89166 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -17,11 +17,9 @@
 
 package org.apache.spark.streaming.ui
 
-import java.text.SimpleDateFormat
-import java.util.Date
 import javax.servlet.http.HttpServletRequest
 
-import scala.xml.{NodeSeq, Node, Text}
+import scala.xml.{NodeSeq, Node, Text, Unparsed}
 
 import org.apache.commons.lang3.StringEscapeUtils
 
@@ -303,6 +301,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
       
batchUIData.processingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
     val formattedTotalDelay = 
batchUIData.totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
 
+    val inputMetadatas = batchUIData.streamIdToInputInfo.values.flatMap { 
inputInfo =>
+      inputInfo.metadataDescription.map(desc => inputInfo.inputStreamId -> 
desc)
+    }.toSeq
     val summary: NodeSeq =
       <div>
         <ul class="unstyled">
@@ -326,6 +327,13 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
             <strong>Total delay: </strong>
             {formattedTotalDelay}
           </li>
+          {
+            if (inputMetadatas.nonEmpty) {
+              <li>
+                <strong>Input 
Metadata:</strong>{generateInputMetadataTable(inputMetadatas)}
+              </li>
+            }
+          }
         </ul>
       </div>
 
@@ -340,4 +348,33 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
 
     SparkUIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", 
content, parent)
   }
+
+  def generateInputMetadataTable(inputMetadatas: Seq[(Int, String)]): 
Seq[Node] = {
+    <table class={SparkUIUtils.TABLE_CLASS_STRIPED}>
+      <thead>
+        <tr>
+          <th>Input</th>
+          <th>Metadata</th>
+        </tr>
+      </thead>
+      <tbody>
+        {inputMetadatas.flatMap(generateInputMetadataRow)}
+      </tbody>
+    </table>
+  }
+
+  def generateInputMetadataRow(inputMetadata: (Int, String)): Seq[Node] = {
+    val streamId = inputMetadata._1
+
+    <tr>
+      
<td>{streamingListener.streamName(streamId).getOrElse(s"Stream-$streamId")}</td>
+      <td>{metadataDescriptionToHTML(inputMetadata._2)}</td>
+    </tr>
+  }
+
+  private def metadataDescriptionToHTML(metadataDescription: String): 
Seq[Node] = {
+    // tab to 4 spaces and "\n" to "<br/>"
+    Unparsed(StringEscapeUtils.escapeHtml4(metadataDescription).
+      replaceAllLiterally("\t", 
"&nbsp;&nbsp;&nbsp;&nbsp;").replaceAllLiterally("\n", "<br/>"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index a5514df..ae508c0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -19,14 +19,14 @@
 package org.apache.spark.streaming.ui
 
 import org.apache.spark.streaming.Time
-import org.apache.spark.streaming.scheduler.BatchInfo
+import org.apache.spark.streaming.scheduler.{BatchInfo, StreamInputInfo}
 import org.apache.spark.streaming.ui.StreamingJobProgressListener._
 
 private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, 
sparkJobId: SparkJobId)
 
 private[ui] case class BatchUIData(
     val batchTime: Time,
-    val streamIdToNumRecords: Map[Int, Long],
+    val streamIdToInputInfo: Map[Int, StreamInputInfo],
     val submissionTime: Long,
     val processingStartTime: Option[Long],
     val processingEndTime: Option[Long],
@@ -58,7 +58,7 @@ private[ui] case class BatchUIData(
   /**
    * The number of recorders received by the receivers in this batch.
    */
-  def numRecords: Long = streamIdToNumRecords.values.sum
+  def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
 }
 
 private[ui] object BatchUIData {
@@ -66,7 +66,7 @@ private[ui] object BatchUIData {
   def apply(batchInfo: BatchInfo): BatchUIData = {
     new BatchUIData(
       batchInfo.batchTime,
-      batchInfo.streamIdToNumRecords,
+      batchInfo.streamIdToInputInfo,
       batchInfo.submissionTime,
       batchInfo.processingStartTime,
       batchInfo.processingEndTime

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 68e8ce9..b77c555 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -192,7 +192,7 @@ private[streaming] class StreamingJobProgressListener(ssc: 
StreamingContext)
   def receivedEventRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = 
synchronized {
     val _retainedBatches = retainedBatches
     val latestBatches = _retainedBatches.map { batchUIData =>
-      (batchUIData.batchTime.milliseconds, batchUIData.streamIdToNumRecords)
+      (batchUIData.batchTime.milliseconds, 
batchUIData.streamIdToInputInfo.mapValues(_.numRecords))
     }
     streamIds.map { streamId =>
       val eventRates = latestBatches.map {
@@ -205,7 +205,8 @@ private[streaming] class StreamingJobProgressListener(ssc: 
StreamingContext)
   }
 
   def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
-    val lastReceivedBlockInfoOption = 
lastReceivedBatch.map(_.streamIdToNumRecords)
+    val lastReceivedBlockInfoOption =
+      lastReceivedBatch.map(_.streamIdToInputInfo.mapValues(_.numRecords))
     lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
       streamIds.map { streamId =>
         (streamId, lastReceivedBlockInfo.getOrElse(streamId, 0L))

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 7bc7727..4bc1dd4 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -59,7 +59,7 @@ class StreamingListenerSuite extends TestSuiteBase with 
Matchers {
 
     batchInfosSubmitted.foreach { info =>
       info.numRecords should be (1L)
-      info.streamIdToNumRecords should be (Map(0 -> 1L))
+      info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
     }
 
     isInIncreasingOrder(batchInfosSubmitted.map(_.submissionTime)) should be 
(true)
@@ -77,7 +77,7 @@ class StreamingListenerSuite extends TestSuiteBase with 
Matchers {
 
     batchInfosStarted.foreach { info =>
       info.numRecords should be (1L)
-      info.streamIdToNumRecords should be (Map(0 -> 1L))
+      info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
     }
 
     isInIncreasingOrder(batchInfosStarted.map(_.submissionTime)) should be 
(true)
@@ -98,7 +98,7 @@ class StreamingListenerSuite extends TestSuiteBase with 
Matchers {
 
     batchInfosCompleted.foreach { info =>
       info.numRecords should be (1L)
-      info.streamIdToNumRecords should be (Map(0 -> 1L))
+      info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
     }
 
     isInIncreasingOrder(batchInfosCompleted.map(_.submissionTime)) should be 
(true)

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 31b1aeb..0d58a7b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -76,7 +76,7 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, 
input: Seq[Seq[T]],
     }
 
     // Report the input data's information to InputInfoTracker for testing
-    val inputInfo = InputInfo(id, selectedInput.length.toLong)
+    val inputInfo = StreamInputInfo(id, selectedInput.length.toLong)
     ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
 
     val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala
index 2e21039..f5248ac 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala
@@ -46,8 +46,8 @@ class InputInfoTrackerSuite extends SparkFunSuite with 
BeforeAndAfter {
     val streamId1 = 0
     val streamId2 = 1
     val time = Time(0L)
-    val inputInfo1 = InputInfo(streamId1, 100L)
-    val inputInfo2 = InputInfo(streamId2, 300L)
+    val inputInfo1 = StreamInputInfo(streamId1, 100L)
+    val inputInfo2 = StreamInputInfo(streamId2, 300L)
     inputInfoTracker.reportInfo(time, inputInfo1)
     inputInfoTracker.reportInfo(time, inputInfo2)
 
@@ -63,8 +63,8 @@ class InputInfoTrackerSuite extends SparkFunSuite with 
BeforeAndAfter {
     val inputInfoTracker = new InputInfoTracker(ssc)
 
     val streamId1 = 0
-    val inputInfo1 = InputInfo(streamId1, 100L)
-    val inputInfo2 = InputInfo(streamId1, 300L)
+    val inputInfo1 = StreamInputInfo(streamId1, 100L)
+    val inputInfo2 = StreamInputInfo(streamId1, 300L)
     inputInfoTracker.reportInfo(Time(0), inputInfo1)
     inputInfoTracker.reportInfo(Time(1), inputInfo2)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1f6b0b12/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index c9175d6..40dc1fb 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -49,10 +49,12 @@ class StreamingJobProgressListenerSuite extends 
TestSuiteBase with Matchers {
     val ssc = setupStreams(input, operation)
     val listener = new StreamingJobProgressListener(ssc)
 
-    val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
+    val streamIdToInputInfo = Map(
+      0 -> StreamInputInfo(0, 300L),
+      1 -> StreamInputInfo(1, 300L, 
Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test")))
 
     // onBatchSubmitted
-    val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, 
None, None)
+    val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, 
None, None)
     
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
     listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted)))
     listener.runningBatches should be (Nil)
@@ -64,7 +66,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase 
with Matchers {
     listener.numTotalReceivedRecords should be (0)
 
     // onBatchStarted
-    val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, 
Some(2000), None)
+    val batchInfoStarted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, 
Some(2000), None)
     listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
     listener.waitingBatches should be (Nil)
     listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))
@@ -94,7 +96,9 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase 
with Matchers {
     batchUIData.get.schedulingDelay should be 
(batchInfoStarted.schedulingDelay)
     batchUIData.get.processingDelay should be 
(batchInfoStarted.processingDelay)
     batchUIData.get.totalDelay should be (batchInfoStarted.totalDelay)
-    batchUIData.get.streamIdToNumRecords should be (Map(0 -> 300L, 1 -> 300L))
+    batchUIData.get.streamIdToInputInfo should be (Map(
+      0 -> StreamInputInfo(0, 300L),
+      1 -> StreamInputInfo(1, 300L, 
Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test"))))
     batchUIData.get.numRecords should be(600)
     batchUIData.get.outputOpIdSparkJobIdPairs should be
       Seq(OutputOpIdAndSparkJobId(0, 0),
@@ -103,7 +107,7 @@ class StreamingJobProgressListenerSuite extends 
TestSuiteBase with Matchers {
         OutputOpIdAndSparkJobId(1, 1))
 
     // onBatchCompleted
-    val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, 
Some(2000), None)
+    val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, 
Some(2000), None)
     
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
     listener.waitingBatches should be (Nil)
     listener.runningBatches should be (Nil)
@@ -141,9 +145,9 @@ class StreamingJobProgressListenerSuite extends 
TestSuiteBase with Matchers {
     val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
     val listener = new StreamingJobProgressListener(ssc)
 
-    val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
+    val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> 
StreamInputInfo(1, 300L))
 
-    val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, 
Some(2000), None)
+    val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, 
Some(2000), None)
 
     for(_ <- 0 until (limit + 10)) {
       
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
@@ -182,7 +186,7 @@ class StreamingJobProgressListenerSuite extends 
TestSuiteBase with Matchers {
     batchUIData.get.schedulingDelay should be 
(batchInfoSubmitted.schedulingDelay)
     batchUIData.get.processingDelay should be 
(batchInfoSubmitted.processingDelay)
     batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay)
-    batchUIData.get.streamIdToNumRecords should be (Map.empty)
+    batchUIData.get.streamIdToInputInfo should be (Map.empty)
     batchUIData.get.numRecords should be (0)
     batchUIData.get.outputOpIdSparkJobIdPairs should be 
(Seq(OutputOpIdAndSparkJobId(0, 0)))
 
@@ -211,14 +215,14 @@ class StreamingJobProgressListenerSuite extends 
TestSuiteBase with Matchers {
     val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
 
     for (_ <- 0 until 2 * limit) {
-      val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
+      val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> 
StreamInputInfo(1, 300L))
 
       // onBatchSubmitted
-      val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToNumRecords, 
1000, None, None)
+      val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 
1000, None, None)
       
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
 
       // onBatchStarted
-      val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, 
Some(2000), None)
+      val batchInfoStarted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, 
Some(2000), None)
       listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
 
       // onJobStart
@@ -235,7 +239,7 @@ class StreamingJobProgressListenerSuite extends 
TestSuiteBase with Matchers {
       listener.onJobStart(jobStart4)
 
       // onBatchCompleted
-      val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 
1000, Some(2000), None)
+      val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 
1000, Some(2000), None)
       
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to