[
https://issues.apache.org/jira/browse/SPARK-13356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718181#comment-16718181
]
ASF GitHub Bot commented on SPARK-13356:
vanzin closed pull request #11228: [SPARK-13356][Streaming]WebUI missing input
informations when recovering from dirver failure
URL: https://github.com/apache/spark/pull/11228
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 7e57bb18cbd50..01404b7a53540 100644
---
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -313,16 +313,32 @@ private[spark] class DirectKafkaInputDStream[K, V](
override def restore(): Unit = {
batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
- logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ",
"]")}")
- generatedRDDs += t -> new KafkaRDD[K, V](
- context.sparkContext,
- executorKafkaParams,
- b.map(OffsetRange(_)),
- getPreferredHosts,
- // during restore, it's possible same partition will be consumed
from multiple
- // threads, so dont use cache
- false
- )
+logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ",
"]")}")
+val recoverOffsets = b.map(OffsetRange(_))
+val rdd = new KafkaRDD[K, V](
+ context.sparkContext,
+ executorKafkaParams,
+ recoverOffsets,
+ getPreferredHosts,
+ // during restore, it's possible same partition will be consumed
from multiple
+ // threads, so dont use cache
+ false
+)
+// Report the record number and metadata of this batch interval to
InputInfoTracker.
+val description = recoverOffsets.filter { offsetRange =>
+ // Don't display empty ranges.
+ offsetRange.fromOffset != offsetRange.untilOffset
+}.map { offsetRange =>
+ s"topic: ${offsetRange.topic}\tpartition:
${offsetRange.partition}\t" +
+s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+}.mkString("\n")
+// Copy offsetRanges to immutable.List to prevent from being modified
by the user
+val metadata = Map(
+ "offsets" -> recoverOffsets.toList,
+ StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+val inputInfo = StreamInputInfo(id, rdd.count, metadata)
+ssc.scheduler.inputInfoTracker.reportInfo(t, inputInfo)
+generatedRDDs += t -> rdd
}
}
}
diff --git
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index c3c799375bbeb..274d1cbbb07e1 100644
---
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -210,9 +210,26 @@ class DirectKafkaInputDStream[
val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
- logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ",
"]")}")
- generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
- context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders,
messageHandler)
+logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ",
"]")}")
+val recoverOffsets = b.map(OffsetRange(_))
+val rdd = new KafkaRDD[K, V, U, T, R](
+ context.sparkContext, kafkaParams, recoverOffsets, leaders,
messageHandler)
+
+val description = recoverOffsets.filter { offsetRange =>
+ // Don't display empty ranges.
+ offsetRange.fromOffset != offsetRange.untilOffset
+}.map { offsetRange =>
+ s"topic: ${offsetRange.topic}\tpartition:
${offsetRange.partition}\t" +
+s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+}.mkString("\n")
+
+// Copy offsetRanges to immutable.List to prevent from being modified
by the user
+val metadata = Map(
+ "offsets" -> recoverOffsets.toList,
+ StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+val inputInfo