Dan Dutrow created SPARK-21065: ---------------------------------- Summary: Spark Streaming concurrentJobs + StreamingJobProgressListener conflict Key: SPARK-21065 URL: https://issues.apache.org/jira/browse/SPARK-21065 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 2.1.0 Reporter: Dan Dutrow
My streaming application has 200+ output operations, many of them stateful and several of them windowed. In an attempt to reduce the processing times, I set "spark.streaming.concurrentJobs" to 2+. Initial results are very positive, cutting our processing time from ~3 minutes to ~1 minute, but eventually we encounter an exception as follows: Note that 1496977560000 ms is 2017-06-09 03:06:00, so it's trying to get a batch from 45 minutes before the exception is thrown. 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener StreamingJobProgressListener threw an exception java.util.NoSuchElementException: key not found 1496977560000 ms at scala.collection.MalLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.mutable.HashMap.apply(HashMap.scala:65) at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) ... The Spark code causing the exception is here: https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125 override def onOutputOperationCompleted( outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = synchronized { // This method is called before onBatchCompleted {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color} updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) } It seems to me that it may be caused by that batch being removed earlier. https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { synchronized { waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color} val batchUIData = BatchUIData(batchCompleted.batchInfo) completedBatchUIData.enqueue(batchUIData) if (completedBatchUIData.size > batchUIDataLimit) { val removedBatch = completedBatchUIData.dequeue() batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) } totalCompletedBatches += 1L totalProcessedRecords += batchUIData.numRecords } } What is the solution here? Should I make my spark streaming context remember duration a lot longer? ssc.remember(batchDuration * rememberDuration) Otherwise, it seems like there should be some kind of existence check on runningBatchUIData before dereferencing it. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org