[ 
https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216375#comment-17216375
 ] 

Shixiong Zhu commented on SPARK-21065:
--------------------------------------

If you are seeing many active batches, it's likely your streaming application 
is too slow. You can try to look at UI and see if there are anything obvious 
that you can optimize.

> Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
> ----------------------------------------------------------------------
>
>                 Key: SPARK-21065
>                 URL: https://issues.apache.org/jira/browse/SPARK-21065
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Scheduler, Spark Core, Web UI
>    Affects Versions: 2.1.0
>            Reporter: Dan Dutrow
>            Priority: Major
>
> My streaming application has 200+ output operations, many of them stateful 
> and several of them windowed. In an attempt to reduce the processing times, I 
> set "spark.streaming.concurrentJobs" to 2+. Initial results are very 
> positive, cutting our processing time from ~3 minutes to ~1 minute, but 
> eventually we encounter an exception as follows:
> (Note that 1496977560000 ms is 2017-06-09 03:06:00, so it's trying to get a 
> batch from 45 minutes before the exception is thrown.)
> 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
> org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
> StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found 1496977560000 ms
> at scala.collection.MalLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
> ...
> The Spark code causing the exception is here:
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
>   override def onOutputOperationCompleted(
>       outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
> Unit = synchronized {
>     // This method is called before onBatchCompleted
> {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
>       updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
> }
> It seems to me that it may be caused by that batch being removed earlier.
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102
>   override def onBatchCompleted(batchCompleted: 
> StreamingListenerBatchCompleted): Unit = {
>     synchronized {
>       waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
>       
> {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
>       val batchUIData = BatchUIData(batchCompleted.batchInfo)
>       completedBatchUIData.enqueue(batchUIData)
>       if (completedBatchUIData.size > batchUIDataLimit) {
>         val removedBatch = completedBatchUIData.dequeue()
>         batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
>       }
>       totalCompletedBatches += 1L
>       totalProcessedRecords += batchUIData.numRecords
>     }
> }
> What is the solution here? Should I make my spark streaming context remember 
> duration a lot longer? ssc.remember(batchDuration * rememberMultiple)
> Otherwise, it seems like there should be some kind of existence check on 
> runningBatchUIData before dereferencing it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to