[ 
https://issues.apache.org/jira/browse/SPARK-13356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718181#comment-16718181
 ] 

ASF GitHub Bot commented on SPARK-13356:
----------------------------------------

vanzin closed pull request #11228: [SPARK-13356][Streaming]WebUI missing input 
informations when recovering from dirver failure
URL: https://github.com/apache/spark/pull/11228
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 7e57bb18cbd50..01404b7a53540 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -313,16 +313,32 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
     override def restore(): Unit = {
       batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
-         logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", 
"]")}")
-         generatedRDDs += t -> new KafkaRDD[K, V](
-           context.sparkContext,
-           executorKafkaParams,
-           b.map(OffsetRange(_)),
-           getPreferredHosts,
-           // during restore, it's possible same partition will be consumed 
from multiple
-           // threads, so dont use cache
-           false
-         )
+        logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", 
"]")}")
+        val recoverOffsets = b.map(OffsetRange(_))
+        val rdd = new KafkaRDD[K, V](
+          context.sparkContext,
+          executorKafkaParams,
+          recoverOffsets,
+          getPreferredHosts,
+          // during restore, it's possible same partition will be consumed 
from multiple
+          // threads, so dont use cache
+          false
+        )
+        // Report the record number and metadata of this batch interval to 
InputInfoTracker.
+        val description = recoverOffsets.filter { offsetRange =>
+          // Don't display empty ranges.
+          offsetRange.fromOffset != offsetRange.untilOffset
+        }.map { offsetRange =>
+          s"topic: ${offsetRange.topic}\tpartition: 
${offsetRange.partition}\t" +
+            s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+        }.mkString("\n")
+        // Copy offsetRanges to immutable.List to prevent from being modified 
by the user
+        val metadata = Map(
+          "offsets" -> recoverOffsets.toList,
+          StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+        val inputInfo = StreamInputInfo(id, rdd.count, metadata)
+        ssc.scheduler.inputInfoTracker.reportInfo(t, inputInfo)
+        generatedRDDs += t -> rdd
       }
     }
   }
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index c3c799375bbeb..274d1cbbb07e1 100644
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -210,9 +210,26 @@ class DirectKafkaInputDStream[
       val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
 
       batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
-         logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", 
"]")}")
-         generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
-           context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, 
messageHandler)
+        logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", 
"]")}")
+        val recoverOffsets = b.map(OffsetRange(_))
+        val rdd = new KafkaRDD[K, V, U, T, R](
+          context.sparkContext, kafkaParams, recoverOffsets, leaders, 
messageHandler)
+
+        val description = recoverOffsets.filter { offsetRange =>
+          // Don't display empty ranges.
+          offsetRange.fromOffset != offsetRange.untilOffset
+        }.map { offsetRange =>
+          s"topic: ${offsetRange.topic}\tpartition: 
${offsetRange.partition}\t" +
+            s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+        }.mkString("\n")
+
+        // Copy offsetRanges to immutable.List to prevent from being modified 
by the user
+        val metadata = Map(
+          "offsets" -> recoverOffsets.toList,
+          StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+        val inputInfo = StreamInputInfo(id, rdd.count, metadata)
+        ssc.scheduler.inputInfoTracker.reportInfo(t, inputInfo)
+        generatedRDDs += t -> rdd
       }
     }
   }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 4808d0fcbc6cc..3d08ceb3dd917 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -155,7 +155,6 @@ class StreamingContext private[streaming] (
   private[streaming] val graph: DStreamGraph = {
     if (isCheckpointPresent) {
       _cp.graph.setContext(this)
-      _cp.graph.restoreCheckpointData()
       _cp.graph
     } else {
       require(_batchDur != null, "Batch duration for StreamingContext cannot 
be null")
@@ -181,6 +180,11 @@ class StreamingContext private[streaming] (
   }
 
   private[streaming] val scheduler = new JobScheduler(this)
+  // restore data has to report info to inputInfoTracker,
+  // so it need to be initialzed after JobScheduler
+  if (isCheckpointPresent) {
+    graph.restoreCheckpointData()
+  }
 
   private[streaming] val waiter = new ContextWaiter
 
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index ed9305875cb77..678811bbfdc08 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -345,6 +345,11 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
             f.mkString("[", ", ", "]") )
           batchTimeToSelectedFiles.synchronized { batchTimeToSelectedFiles += 
((t, f)) }
           recentlySelectedFiles ++= f
+          val metadata = Map(
+            "files" -> f.toList,
+            StreamInputInfo.METADATA_KEY_DESCRIPTION -> f.mkString("\n"))
+          val inputInfo = StreamInputInfo(id, 0, metadata)
+          ssc.scheduler.inputInfoTracker.reportInfo(t, inputInfo)
           generatedRDDs += ((t, filesToRDD(f)))
       }
     }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 98e099354a7db..9ab305a4a2941 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -59,7 +59,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging 
{
   // eventLoop not being null means the scheduler has been started and not 
stopped
   var receiverTracker: ReceiverTracker = null
   // A tracker to track all the input stream information as well as processed 
record number
-  var inputInfoTracker: InputInfoTracker = null
+  var inputInfoTracker: InputInfoTracker = new InputInfoTracker(ssc)
 
   private var executorAllocationManager: Option[ExecutorAllocationManager] = 
None
 
@@ -84,7 +84,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging 
{
 
     listenerBus.start()
     receiverTracker = new ReceiverTracker(ssc)
-    inputInfoTracker = new InputInfoTracker(ssc)
 
     val executorAllocClient: ExecutorAllocationClient = 
ssc.sparkContext.schedulerBackend match {
       case b: ExecutorAllocationClient => 
b.asInstanceOf[ExecutorAllocationClient]
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index b79cc65d8b5e9..2818657b023bd 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -56,6 +56,8 @@ class CheckpointInputDStream(_ssc: StreamingContext) extends 
InputDStream[Int](_
     @transient
     var restoredTimes = 0
     override def restore() {
+      val inputInfo = StreamInputInfo(0, 100L)
+      ssc.scheduler.inputInfoTracker.reportInfo(Time(0), inputInfo)
       restoredTimes += 1
       super.restore()
     }
@@ -598,6 +600,23 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
     ssc.stop()
   }
 
+  test("SPARK-13356: report informations when recovering from dirver failure") 
{
+    withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+      ssc.checkpoint(checkpointDir)
+      val inputDStream = new CheckpointInputDStream(ssc)
+      val mappedDStream = inputDStream.map(_ + 100)
+      val outputStream = new TestOutputStreamWithPartitions(mappedDStream)
+      outputStream.register()
+      val batchDurationMillis = ssc.progressListener.batchDuration
+      generateOutput(ssc, Time(batchDurationMillis), checkpointDir, 
stopSparkContext = true)
+    }
+    logInfo("*********** RESTARTING ************")
+    withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
+      // Verify get input info successfully after recovered StreamingContext 
from checkpoint
+      assert(ssc.scheduler.inputInfoTracker.getInfo(Time(0)).size === 1)
+    }
+  }
+
   // This tests whether file input stream remembers what files were seen before
   // the master failure and uses them again to process a large window 
operation.
   // It also tests whether batches, whose processing was incomplete due to the


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> WebUI missing input informations when recovering from dirver failure
> --------------------------------------------------------------------
>
>                 Key: SPARK-13356
>                 URL: https://issues.apache.org/jira/browse/SPARK-13356
>             Project: Spark
>          Issue Type: Bug
>          Components: Web UI
>    Affects Versions: 1.5.2, 1.6.0
>            Reporter: jeanlyn
>            Priority: Minor
>         Attachments: DirectKafkaScreenshot.jpg
>
>
> WebUI missing some input information when streaming recover from checkpoint, 
> it may confuse people the data had lose when recover from failure.
> For example:
> !DirectKafkaScreenshot.jpg!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to