[ 
https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046649#comment-16046649
 ] 

Dan Dutrow commented on SPARK-21065:
------------------------------------

Something to note: If one batch's processing time exceeds the batch interval, 
then a second batch could begin before the first is complete. This is fine 
behavior for us, and offers the ability to catch up if something gets delayed, 
but may be confusing for the scheduler.

> Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
> ----------------------------------------------------------------------
>
>                 Key: SPARK-21065
>                 URL: https://issues.apache.org/jira/browse/SPARK-21065
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Scheduler
>    Affects Versions: 2.1.0
>            Reporter: Dan Dutrow
>
> My streaming application has 200+ output operations, many of them stateful 
> and several of them windowed. In an attempt to reduce the processing times, I 
> set "spark.streaming.concurrentJobs" to 2+. Initial results are very 
> positive, cutting our processing time from ~3 minutes to ~1 minute, but 
> eventually we encounter an exception as follows:
> Note that 1496977560000 ms is 2017-06-09 03:06:00, so it's trying to get a 
> batch from 45 minutes before the exception is thrown.
> 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
> org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
> StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found 1496977560000 ms
> at scala.collection.MalLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
> ...
> The Spark code causing the exception is here:
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
>   override def onOutputOperationCompleted(
>       outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
> Unit = synchronized {
>     // This method is called before onBatchCompleted
> {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
>       updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
> }
> It seems to me that it may be caused by that batch being removed earlier.
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102
>   override def onBatchCompleted(batchCompleted: 
> StreamingListenerBatchCompleted): Unit = {
>     synchronized {
>       waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
>       
> {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
>       val batchUIData = BatchUIData(batchCompleted.batchInfo)
>       completedBatchUIData.enqueue(batchUIData)
>       if (completedBatchUIData.size > batchUIDataLimit) {
>         val removedBatch = completedBatchUIData.dequeue()
>         batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
>       }
>       totalCompletedBatches += 1L
>       totalProcessedRecords += batchUIData.numRecords
>     }
> }
> What is the solution here? Should I make my spark streaming context remember 
> duration a lot longer? ssc.remember(batchDuration * rememberMultiple)
> Otherwise, it seems like there should be some kind of existence check on 
> runningBatchUIData before dereferencing it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to