[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
[ https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216375#comment-17216375 ] Shixiong Zhu commented on SPARK-21065: -- If you are seeing many active batches, it's likely your streaming application is too slow. You can try to look at UI and see if there are anything obvious that you can optimize. > Spark Streaming concurrentJobs + StreamingJobProgressListener conflict > -- > > Key: SPARK-21065 > URL: https://issues.apache.org/jira/browse/SPARK-21065 > Project: Spark > Issue Type: Bug > Components: DStreams, Scheduler, Spark Core, Web UI >Affects Versions: 2.1.0 >Reporter: Dan Dutrow >Priority: Major > > My streaming application has 200+ output operations, many of them stateful > and several of them windowed. In an attempt to reduce the processing times, I > set "spark.streaming.concurrentJobs" to 2+. Initial results are very > positive, cutting our processing time from ~3 minutes to ~1 minute, but > eventually we encounter an exception as follows: > (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a > batch from 45 minutes before the exception is thrown.) > 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR > org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener > StreamingJobProgressListener threw an exception > java.util.NoSuchElementException: key not found 149697756 ms > at scala.collection.MalLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.mutable.HashMap.apply(HashMap.scala:65) > at > org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) > at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) > ... > The Spark code causing the exception is here: > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125 > override def onOutputOperationCompleted( > outputOperationCompleted: StreamingListenerOutputOperationCompleted): > Unit = synchronized { > // This method is called before onBatchCompleted > {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color} > updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) > } > It seems to me that it may be caused by that batch being removed earlier. > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102 > override def onBatchCompleted(batchCompleted: > StreamingListenerBatchCompleted): Unit = { > synchronized { > waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) > > {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color} > val batchUIData = BatchUIData(batchCompleted.batchInfo) > completedBatchUIData.enqueue(batchUIData) > if (completedBatchUIData.size > batchUIDataLimit) { > val removedBatch = completedBatchUIData.dequeue() > batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) > } > totalCompletedBatches += 1L > totalProcessedRecords += batchUIData.numRecords > } > } > What is the solution here? Should I make my spark streaming context remember > duration a lot longer? ssc.remember(batchDuration * rememberMultiple) > Otherwise, it seems like there should be some kind of existence check on > runningBatchUIData before dereferencing it. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
[ https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216342#comment-17216342 ] Sachit Murarka commented on SPARK-21065: [~zsxwing] Thanks for quick response , any suggestion on optimizing many active batches. (Probably I should reduce the processing time or increase the batch interval). Correct? Any other thing? > Spark Streaming concurrentJobs + StreamingJobProgressListener conflict > -- > > Key: SPARK-21065 > URL: https://issues.apache.org/jira/browse/SPARK-21065 > Project: Spark > Issue Type: Bug > Components: DStreams, Scheduler, Spark Core, Web UI >Affects Versions: 2.1.0 >Reporter: Dan Dutrow >Priority: Major > > My streaming application has 200+ output operations, many of them stateful > and several of them windowed. In an attempt to reduce the processing times, I > set "spark.streaming.concurrentJobs" to 2+. Initial results are very > positive, cutting our processing time from ~3 minutes to ~1 minute, but > eventually we encounter an exception as follows: > (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a > batch from 45 minutes before the exception is thrown.) > 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR > org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener > StreamingJobProgressListener threw an exception > java.util.NoSuchElementException: key not found 149697756 ms > at scala.collection.MalLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.mutable.HashMap.apply(HashMap.scala:65) > at > org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) > at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) > ... > The Spark code causing the exception is here: > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125 > override def onOutputOperationCompleted( > outputOperationCompleted: StreamingListenerOutputOperationCompleted): > Unit = synchronized { > // This method is called before onBatchCompleted > {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color} > updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) > } > It seems to me that it may be caused by that batch being removed earlier. > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102 > override def onBatchCompleted(batchCompleted: > StreamingListenerBatchCompleted): Unit = { > synchronized { > waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) > > {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color} > val batchUIData = BatchUIData(batchCompleted.batchInfo) > completedBatchUIData.enqueue(batchUIData) > if (completedBatchUIData.size > batchUIDataLimit) { > val removedBatch = completedBatchUIData.dequeue() > batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) > } > totalCompletedBatches += 1L > totalProcessedRecords += batchUIData.numRecords > } > } > What is the solution here? Should I make my spark streaming context remember > duration a lot longer? ssc.remember(batchDuration * rememberMultiple) > Otherwise, it seems like there should be some kind of existence check on > runningBatchUIData before dereferencing it. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
[ https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216341#comment-17216341 ] Shixiong Zhu commented on SPARK-21065: -- `spark.streaming.concurrentJobs` is not safe. Fixing it requires fundamental system changes. We don't have any plan for this. > Spark Streaming concurrentJobs + StreamingJobProgressListener conflict > -- > > Key: SPARK-21065 > URL: https://issues.apache.org/jira/browse/SPARK-21065 > Project: Spark > Issue Type: Bug > Components: DStreams, Scheduler, Spark Core, Web UI >Affects Versions: 2.1.0 >Reporter: Dan Dutrow >Priority: Major > > My streaming application has 200+ output operations, many of them stateful > and several of them windowed. In an attempt to reduce the processing times, I > set "spark.streaming.concurrentJobs" to 2+. Initial results are very > positive, cutting our processing time from ~3 minutes to ~1 minute, but > eventually we encounter an exception as follows: > (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a > batch from 45 minutes before the exception is thrown.) > 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR > org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener > StreamingJobProgressListener threw an exception > java.util.NoSuchElementException: key not found 149697756 ms > at scala.collection.MalLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.mutable.HashMap.apply(HashMap.scala:65) > at > org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) > at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) > ... > The Spark code causing the exception is here: > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125 > override def onOutputOperationCompleted( > outputOperationCompleted: StreamingListenerOutputOperationCompleted): > Unit = synchronized { > // This method is called before onBatchCompleted > {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color} > updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) > } > It seems to me that it may be caused by that batch being removed earlier. > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102 > override def onBatchCompleted(batchCompleted: > StreamingListenerBatchCompleted): Unit = { > synchronized { > waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) > > {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color} > val batchUIData = BatchUIData(batchCompleted.batchInfo) > completedBatchUIData.enqueue(batchUIData) > if (completedBatchUIData.size > batchUIDataLimit) { > val removedBatch = completedBatchUIData.dequeue() > batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) > } > totalCompletedBatches += 1L > totalProcessedRecords += batchUIData.numRecords > } > } > What is the solution here? Should I make my spark streaming context remember > duration a lot longer? ssc.remember(batchDuration * rememberMultiple) > Otherwise, it seems like there should be some kind of existence check on > runningBatchUIData before dereferencing it. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
[ https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216330#comment-17216330 ] Sachit Murarka commented on SPARK-21065: [~zsxwing] , Any idea if concurrentJobs still causes issue in 2.4 release of Spark as well > Spark Streaming concurrentJobs + StreamingJobProgressListener conflict > -- > > Key: SPARK-21065 > URL: https://issues.apache.org/jira/browse/SPARK-21065 > Project: Spark > Issue Type: Bug > Components: DStreams, Scheduler, Spark Core, Web UI >Affects Versions: 2.1.0 >Reporter: Dan Dutrow >Priority: Major > > My streaming application has 200+ output operations, many of them stateful > and several of them windowed. In an attempt to reduce the processing times, I > set "spark.streaming.concurrentJobs" to 2+. Initial results are very > positive, cutting our processing time from ~3 minutes to ~1 minute, but > eventually we encounter an exception as follows: > (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a > batch from 45 minutes before the exception is thrown.) > 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR > org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener > StreamingJobProgressListener threw an exception > java.util.NoSuchElementException: key not found 149697756 ms > at scala.collection.MalLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.mutable.HashMap.apply(HashMap.scala:65) > at > org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) > at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) > ... > The Spark code causing the exception is here: > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125 > override def onOutputOperationCompleted( > outputOperationCompleted: StreamingListenerOutputOperationCompleted): > Unit = synchronized { > // This method is called before onBatchCompleted > {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color} > updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) > } > It seems to me that it may be caused by that batch being removed earlier. > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102 > override def onBatchCompleted(batchCompleted: > StreamingListenerBatchCompleted): Unit = { > synchronized { > waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) > > {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color} > val batchUIData = BatchUIData(batchCompleted.batchInfo) > completedBatchUIData.enqueue(batchUIData) > if (completedBatchUIData.size > batchUIDataLimit) { > val removedBatch = completedBatchUIData.dequeue() > batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) > } > totalCompletedBatches += 1L > totalProcessedRecords += batchUIData.numRecords > } > } > What is the solution here? Should I make my spark streaming context remember > duration a lot longer? ssc.remember(batchDuration * rememberMultiple) > Otherwise, it seems like there should be some kind of existence check on > runningBatchUIData before dereferencing it. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
[ https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040456#comment-17040456 ] Artur Sukhenko commented on SPARK-21065: [~zsxwing] Is `spark.streaming.concurrentJobs` still (2.2/2.3/2.4) risky? > Spark Streaming concurrentJobs + StreamingJobProgressListener conflict > -- > > Key: SPARK-21065 > URL: https://issues.apache.org/jira/browse/SPARK-21065 > Project: Spark > Issue Type: Bug > Components: DStreams, Scheduler, Web UI >Affects Versions: 2.1.0 >Reporter: Dan Dutrow >Priority: Major > > My streaming application has 200+ output operations, many of them stateful > and several of them windowed. In an attempt to reduce the processing times, I > set "spark.streaming.concurrentJobs" to 2+. Initial results are very > positive, cutting our processing time from ~3 minutes to ~1 minute, but > eventually we encounter an exception as follows: > (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a > batch from 45 minutes before the exception is thrown.) > 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR > org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener > StreamingJobProgressListener threw an exception > java.util.NoSuchElementException: key not found 149697756 ms > at scala.collection.MalLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.mutable.HashMap.apply(HashMap.scala:65) > at > org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) > at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) > ... > The Spark code causing the exception is here: > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125 > override def onOutputOperationCompleted( > outputOperationCompleted: StreamingListenerOutputOperationCompleted): > Unit = synchronized { > // This method is called before onBatchCompleted > {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color} > updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) > } > It seems to me that it may be caused by that batch being removed earlier. > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102 > override def onBatchCompleted(batchCompleted: > StreamingListenerBatchCompleted): Unit = { > synchronized { > waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) > > {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color} > val batchUIData = BatchUIData(batchCompleted.batchInfo) > completedBatchUIData.enqueue(batchUIData) > if (completedBatchUIData.size > batchUIDataLimit) { > val removedBatch = completedBatchUIData.dequeue() > batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) > } > totalCompletedBatches += 1L > totalProcessedRecords += batchUIData.numRecords > } > } > What is the solution here? Should I make my spark streaming context remember > duration a lot longer? ssc.remember(batchDuration * rememberMultiple) > Otherwise, it seems like there should be some kind of existence check on > runningBatchUIData before dereferencing it. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
[ https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16052242#comment-16052242 ] Shixiong Zhu commented on SPARK-21065: -- Please don't use `spark.streaming.concurrentJobs` if possible. It doesn't work with many features, and also can cause data lost. > Spark Streaming concurrentJobs + StreamingJobProgressListener conflict > -- > > Key: SPARK-21065 > URL: https://issues.apache.org/jira/browse/SPARK-21065 > Project: Spark > Issue Type: Bug > Components: DStreams, Scheduler, Web UI >Affects Versions: 2.1.0 >Reporter: Dan Dutrow > > My streaming application has 200+ output operations, many of them stateful > and several of them windowed. In an attempt to reduce the processing times, I > set "spark.streaming.concurrentJobs" to 2+. Initial results are very > positive, cutting our processing time from ~3 minutes to ~1 minute, but > eventually we encounter an exception as follows: > (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a > batch from 45 minutes before the exception is thrown.) > 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR > org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener > StreamingJobProgressListener threw an exception > java.util.NoSuchElementException: key not found 149697756 ms > at scala.collection.MalLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.mutable.HashMap.apply(HashMap.scala:65) > at > org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) > at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) > ... > The Spark code causing the exception is here: > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125 > override def onOutputOperationCompleted( > outputOperationCompleted: StreamingListenerOutputOperationCompleted): > Unit = synchronized { > // This method is called before onBatchCompleted > {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color} > updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) > } > It seems to me that it may be caused by that batch being removed earlier. > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102 > override def onBatchCompleted(batchCompleted: > StreamingListenerBatchCompleted): Unit = { > synchronized { > waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) > > {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color} > val batchUIData = BatchUIData(batchCompleted.batchInfo) > completedBatchUIData.enqueue(batchUIData) > if (completedBatchUIData.size > batchUIDataLimit) { > val removedBatch = completedBatchUIData.dequeue() > batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) > } > totalCompletedBatches += 1L > totalProcessedRecords += batchUIData.numRecords > } > } > What is the solution here? Should I make my spark streaming context remember > duration a lot longer? ssc.remember(batchDuration * rememberMultiple) > Otherwise, it seems like there should be some kind of existence check on > runningBatchUIData before dereferencing it. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
[ https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16046649#comment-16046649 ] Dan Dutrow commented on SPARK-21065: Something to note: If one batch's processing time exceeds the batch interval, then a second batch could begin before the first is complete. This is fine behavior for us, and offers the ability to catch up if something gets delayed, but may be confusing for the scheduler. > Spark Streaming concurrentJobs + StreamingJobProgressListener conflict > -- > > Key: SPARK-21065 > URL: https://issues.apache.org/jira/browse/SPARK-21065 > Project: Spark > Issue Type: Bug > Components: DStreams, Scheduler >Affects Versions: 2.1.0 >Reporter: Dan Dutrow > > My streaming application has 200+ output operations, many of them stateful > and several of them windowed. In an attempt to reduce the processing times, I > set "spark.streaming.concurrentJobs" to 2+. Initial results are very > positive, cutting our processing time from ~3 minutes to ~1 minute, but > eventually we encounter an exception as follows: > Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a > batch from 45 minutes before the exception is thrown. > 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR > org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener > StreamingJobProgressListener threw an exception > java.util.NoSuchElementException: key not found 149697756 ms > at scala.collection.MalLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.mutable.HashMap.apply(HashMap.scala:65) > at > org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) > at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) > ... > The Spark code causing the exception is here: > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125 > override def onOutputOperationCompleted( > outputOperationCompleted: StreamingListenerOutputOperationCompleted): > Unit = synchronized { > // This method is called before onBatchCompleted > {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color} > updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) > } > It seems to me that it may be caused by that batch being removed earlier. > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102 > override def onBatchCompleted(batchCompleted: > StreamingListenerBatchCompleted): Unit = { > synchronized { > waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) > > {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color} > val batchUIData = BatchUIData(batchCompleted.batchInfo) > completedBatchUIData.enqueue(batchUIData) > if (completedBatchUIData.size > batchUIDataLimit) { > val removedBatch = completedBatchUIData.dequeue() > batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) > } > totalCompletedBatches += 1L > totalProcessedRecords += batchUIData.numRecords > } > } > What is the solution here? Should I make my spark streaming context remember > duration a lot longer? ssc.remember(batchDuration * rememberMultiple) > Otherwise, it seems like there should be some kind of existence check on > runningBatchUIData before dereferencing it. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org