[jira] [Commented] (SPARK-13356) WebUI missing input informations when recovering from dirver failure

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-13356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718180#comment-16718180
 ] 

ASF GitHub Bot commented on SPARK-13356:


vanzin commented on issue #11228: [SPARK-13356][Streaming]WebUI missing input 
informations when recovering from dirver failure
URL: https://github.com/apache/spark/pull/11228#issuecomment-446402361
 
 
   This looks very out of date. I'll close this for now, but if you want to 
update and reopen the PR, just push to your branch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> WebUI missing input informations when recovering from dirver failure
> 
>
> Key: SPARK-13356
> URL: https://issues.apache.org/jira/browse/SPARK-13356
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 1.5.2, 1.6.0
>Reporter: jeanlyn
>Priority: Minor
> Attachments: DirectKafkaScreenshot.jpg
>
>
> WebUI missing some input information when streaming recover from checkpoint, 
> it may confuse people the data had lose when recover from failure.
> For example:
> !DirectKafkaScreenshot.jpg!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13356) WebUI missing input informations when recovering from dirver failure

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-13356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718181#comment-16718181
 ] 

ASF GitHub Bot commented on SPARK-13356:


vanzin closed pull request #11228: [SPARK-13356][Streaming]WebUI missing input 
informations when recovering from dirver failure
URL: https://github.com/apache/spark/pull/11228
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 7e57bb18cbd50..01404b7a53540 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -313,16 +313,32 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
 override def restore(): Unit = {
   batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
- logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", 
"]")}")
- generatedRDDs += t -> new KafkaRDD[K, V](
-   context.sparkContext,
-   executorKafkaParams,
-   b.map(OffsetRange(_)),
-   getPreferredHosts,
-   // during restore, it's possible same partition will be consumed 
from multiple
-   // threads, so dont use cache
-   false
- )
+logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", 
"]")}")
+val recoverOffsets = b.map(OffsetRange(_))
+val rdd = new KafkaRDD[K, V](
+  context.sparkContext,
+  executorKafkaParams,
+  recoverOffsets,
+  getPreferredHosts,
+  // during restore, it's possible same partition will be consumed 
from multiple
+  // threads, so dont use cache
+  false
+)
+// Report the record number and metadata of this batch interval to 
InputInfoTracker.
+val description = recoverOffsets.filter { offsetRange =>
+  // Don't display empty ranges.
+  offsetRange.fromOffset != offsetRange.untilOffset
+}.map { offsetRange =>
+  s"topic: ${offsetRange.topic}\tpartition: 
${offsetRange.partition}\t" +
+s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+}.mkString("\n")
+// Copy offsetRanges to immutable.List to prevent from being modified 
by the user
+val metadata = Map(
+  "offsets" -> recoverOffsets.toList,
+  StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+val inputInfo = StreamInputInfo(id, rdd.count, metadata)
+ssc.scheduler.inputInfoTracker.reportInfo(t, inputInfo)
+generatedRDDs += t -> rdd
   }
 }
   }
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index c3c799375bbeb..274d1cbbb07e1 100644
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -210,9 +210,26 @@ class DirectKafkaInputDStream[
   val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
 
   batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
- logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", 
"]")}")
- generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
-   context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, 
messageHandler)
+logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", 
"]")}")
+val recoverOffsets = b.map(OffsetRange(_))
+val rdd = new KafkaRDD[K, V, U, T, R](
+  context.sparkContext, kafkaParams, recoverOffsets, leaders, 
messageHandler)
+
+val description = recoverOffsets.filter { offsetRange =>
+  // Don't display empty ranges.
+  offsetRange.fromOffset != offsetRange.untilOffset
+}.map { offsetRange =>
+  s"topic: ${offsetRange.topic}\tpartition: 
${offsetRange.partition}\t" +
+s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+}.mkString("\n")
+
+// Copy offsetRanges to immutable.List to prevent from being modified 
by the user
+val metadata = Map(
+  "offsets" -> recoverOffsets.toList,
+  StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+val inputInfo 

[jira] [Commented] (SPARK-13356) WebUI missing input informations when recovering from dirver failure

2016-02-16 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149727#comment-15149727
 ] 

Apache Spark commented on SPARK-13356:
--

User 'jeanlyn' has created a pull request for this issue:
https://github.com/apache/spark/pull/11228

> WebUI missing input informations when recovering from dirver failure
> 
>
> Key: SPARK-13356
> URL: https://issues.apache.org/jira/browse/SPARK-13356
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0
>Reporter: jeanlyn
> Attachments: DirectKafkaScreenshot.jpg
>
>
> WebUI missing some input information when streaming recover from checkpoint, 
> it may confuse people the data had lose when recover from failure.
> For example:
> !DirectKafkaScreenshot.jpg!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org