[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict

2020-10-18 Thread Shixiong Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216375#comment-17216375
 ] 

Shixiong Zhu commented on SPARK-21065:
--

If you are seeing many active batches, it's likely your streaming application 
is too slow. You can try to look at UI and see if there are anything obvious 
that you can optimize.

> Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
> --
>
> Key: SPARK-21065
> URL: https://issues.apache.org/jira/browse/SPARK-21065
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Scheduler, Spark Core, Web UI
>Affects Versions: 2.1.0
>Reporter: Dan Dutrow
>Priority: Major
>
> My streaming application has 200+ output operations, many of them stateful 
> and several of them windowed. In an attempt to reduce the processing times, I 
> set "spark.streaming.concurrentJobs" to 2+. Initial results are very 
> positive, cutting our processing time from ~3 minutes to ~1 minute, but 
> eventually we encounter an exception as follows:
> (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
> batch from 45 minutes before the exception is thrown.)
> 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
> org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
> StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found 149697756 ms
> at scala.collection.MalLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
> ...
> The Spark code causing the exception is here:
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
>   override def onOutputOperationCompleted(
>   outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
> Unit = synchronized {
> // This method is called before onBatchCompleted
> {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
>   updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
> }
> It seems to me that it may be caused by that batch being removed earlier.
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102
>   override def onBatchCompleted(batchCompleted: 
> StreamingListenerBatchCompleted): Unit = {
> synchronized {
>   waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
>   
> {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
>   val batchUIData = BatchUIData(batchCompleted.batchInfo)
>   completedBatchUIData.enqueue(batchUIData)
>   if (completedBatchUIData.size > batchUIDataLimit) {
> val removedBatch = completedBatchUIData.dequeue()
> batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
>   }
>   totalCompletedBatches += 1L
>   totalProcessedRecords += batchUIData.numRecords
> }
> }
> What is the solution here? Should I make my spark streaming context remember 
> duration a lot longer? ssc.remember(batchDuration * rememberMultiple)
> Otherwise, it seems like there should be some kind of existence check on 
> runningBatchUIData before dereferencing it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict

2020-10-18 Thread Sachit Murarka (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216342#comment-17216342
 ] 

Sachit Murarka commented on SPARK-21065:


[~zsxwing] Thanks for quick response , any suggestion on optimizing many active 
batches. (Probably I should reduce the processing time or increase the batch 
interval). Correct? Any other thing?

> Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
> --
>
> Key: SPARK-21065
> URL: https://issues.apache.org/jira/browse/SPARK-21065
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Scheduler, Spark Core, Web UI
>Affects Versions: 2.1.0
>Reporter: Dan Dutrow
>Priority: Major
>
> My streaming application has 200+ output operations, many of them stateful 
> and several of them windowed. In an attempt to reduce the processing times, I 
> set "spark.streaming.concurrentJobs" to 2+. Initial results are very 
> positive, cutting our processing time from ~3 minutes to ~1 minute, but 
> eventually we encounter an exception as follows:
> (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
> batch from 45 minutes before the exception is thrown.)
> 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
> org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
> StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found 149697756 ms
> at scala.collection.MalLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
> ...
> The Spark code causing the exception is here:
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
>   override def onOutputOperationCompleted(
>   outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
> Unit = synchronized {
> // This method is called before onBatchCompleted
> {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
>   updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
> }
> It seems to me that it may be caused by that batch being removed earlier.
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102
>   override def onBatchCompleted(batchCompleted: 
> StreamingListenerBatchCompleted): Unit = {
> synchronized {
>   waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
>   
> {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
>   val batchUIData = BatchUIData(batchCompleted.batchInfo)
>   completedBatchUIData.enqueue(batchUIData)
>   if (completedBatchUIData.size > batchUIDataLimit) {
> val removedBatch = completedBatchUIData.dequeue()
> batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
>   }
>   totalCompletedBatches += 1L
>   totalProcessedRecords += batchUIData.numRecords
> }
> }
> What is the solution here? Should I make my spark streaming context remember 
> duration a lot longer? ssc.remember(batchDuration * rememberMultiple)
> Otherwise, it seems like there should be some kind of existence check on 
> runningBatchUIData before dereferencing it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict

2020-10-18 Thread Shixiong Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216341#comment-17216341
 ] 

Shixiong Zhu commented on SPARK-21065:
--

`spark.streaming.concurrentJobs` is not safe. Fixing it requires fundamental 
system changes. We don't have any plan for this.

> Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
> --
>
> Key: SPARK-21065
> URL: https://issues.apache.org/jira/browse/SPARK-21065
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Scheduler, Spark Core, Web UI
>Affects Versions: 2.1.0
>Reporter: Dan Dutrow
>Priority: Major
>
> My streaming application has 200+ output operations, many of them stateful 
> and several of them windowed. In an attempt to reduce the processing times, I 
> set "spark.streaming.concurrentJobs" to 2+. Initial results are very 
> positive, cutting our processing time from ~3 minutes to ~1 minute, but 
> eventually we encounter an exception as follows:
> (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
> batch from 45 minutes before the exception is thrown.)
> 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
> org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
> StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found 149697756 ms
> at scala.collection.MalLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
> ...
> The Spark code causing the exception is here:
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
>   override def onOutputOperationCompleted(
>   outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
> Unit = synchronized {
> // This method is called before onBatchCompleted
> {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
>   updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
> }
> It seems to me that it may be caused by that batch being removed earlier.
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102
>   override def onBatchCompleted(batchCompleted: 
> StreamingListenerBatchCompleted): Unit = {
> synchronized {
>   waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
>   
> {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
>   val batchUIData = BatchUIData(batchCompleted.batchInfo)
>   completedBatchUIData.enqueue(batchUIData)
>   if (completedBatchUIData.size > batchUIDataLimit) {
> val removedBatch = completedBatchUIData.dequeue()
> batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
>   }
>   totalCompletedBatches += 1L
>   totalProcessedRecords += batchUIData.numRecords
> }
> }
> What is the solution here? Should I make my spark streaming context remember 
> duration a lot longer? ssc.remember(batchDuration * rememberMultiple)
> Otherwise, it seems like there should be some kind of existence check on 
> runningBatchUIData before dereferencing it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict

2020-10-18 Thread Sachit Murarka (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216330#comment-17216330
 ] 

Sachit Murarka commented on SPARK-21065:


[~zsxwing] , Any idea if concurrentJobs still causes issue in 2.4 release of 
Spark as well

> Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
> --
>
> Key: SPARK-21065
> URL: https://issues.apache.org/jira/browse/SPARK-21065
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Scheduler, Spark Core, Web UI
>Affects Versions: 2.1.0
>Reporter: Dan Dutrow
>Priority: Major
>
> My streaming application has 200+ output operations, many of them stateful 
> and several of them windowed. In an attempt to reduce the processing times, I 
> set "spark.streaming.concurrentJobs" to 2+. Initial results are very 
> positive, cutting our processing time from ~3 minutes to ~1 minute, but 
> eventually we encounter an exception as follows:
> (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
> batch from 45 minutes before the exception is thrown.)
> 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
> org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
> StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found 149697756 ms
> at scala.collection.MalLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
> ...
> The Spark code causing the exception is here:
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
>   override def onOutputOperationCompleted(
>   outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
> Unit = synchronized {
> // This method is called before onBatchCompleted
> {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
>   updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
> }
> It seems to me that it may be caused by that batch being removed earlier.
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102
>   override def onBatchCompleted(batchCompleted: 
> StreamingListenerBatchCompleted): Unit = {
> synchronized {
>   waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
>   
> {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
>   val batchUIData = BatchUIData(batchCompleted.batchInfo)
>   completedBatchUIData.enqueue(batchUIData)
>   if (completedBatchUIData.size > batchUIDataLimit) {
> val removedBatch = completedBatchUIData.dequeue()
> batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
>   }
>   totalCompletedBatches += 1L
>   totalProcessedRecords += batchUIData.numRecords
> }
> }
> What is the solution here? Should I make my spark streaming context remember 
> duration a lot longer? ssc.remember(batchDuration * rememberMultiple)
> Otherwise, it seems like there should be some kind of existence check on 
> runningBatchUIData before dereferencing it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict

2020-02-19 Thread Artur Sukhenko (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040456#comment-17040456
 ] 

Artur Sukhenko commented on SPARK-21065:


[~zsxwing] Is `spark.streaming.concurrentJobs` still (2.2/2.3/2.4) risky?

> Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
> --
>
> Key: SPARK-21065
> URL: https://issues.apache.org/jira/browse/SPARK-21065
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Scheduler, Web UI
>Affects Versions: 2.1.0
>Reporter: Dan Dutrow
>Priority: Major
>
> My streaming application has 200+ output operations, many of them stateful 
> and several of them windowed. In an attempt to reduce the processing times, I 
> set "spark.streaming.concurrentJobs" to 2+. Initial results are very 
> positive, cutting our processing time from ~3 minutes to ~1 minute, but 
> eventually we encounter an exception as follows:
> (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
> batch from 45 minutes before the exception is thrown.)
> 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
> org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
> StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found 149697756 ms
> at scala.collection.MalLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
> ...
> The Spark code causing the exception is here:
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
>   override def onOutputOperationCompleted(
>   outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
> Unit = synchronized {
> // This method is called before onBatchCompleted
> {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
>   updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
> }
> It seems to me that it may be caused by that batch being removed earlier.
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102
>   override def onBatchCompleted(batchCompleted: 
> StreamingListenerBatchCompleted): Unit = {
> synchronized {
>   waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
>   
> {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
>   val batchUIData = BatchUIData(batchCompleted.batchInfo)
>   completedBatchUIData.enqueue(batchUIData)
>   if (completedBatchUIData.size > batchUIDataLimit) {
> val removedBatch = completedBatchUIData.dequeue()
> batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
>   }
>   totalCompletedBatches += 1L
>   totalProcessedRecords += batchUIData.numRecords
> }
> }
> What is the solution here? Should I make my spark streaming context remember 
> duration a lot longer? ssc.remember(batchDuration * rememberMultiple)
> Otherwise, it seems like there should be some kind of existence check on 
> runningBatchUIData before dereferencing it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict

2017-06-16 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16052242#comment-16052242
 ] 

Shixiong Zhu commented on SPARK-21065:
--

Please don't use `spark.streaming.concurrentJobs` if possible. It doesn't work 
with many features, and also can cause data lost.

> Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
> --
>
> Key: SPARK-21065
> URL: https://issues.apache.org/jira/browse/SPARK-21065
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Scheduler, Web UI
>Affects Versions: 2.1.0
>Reporter: Dan Dutrow
>
> My streaming application has 200+ output operations, many of them stateful 
> and several of them windowed. In an attempt to reduce the processing times, I 
> set "spark.streaming.concurrentJobs" to 2+. Initial results are very 
> positive, cutting our processing time from ~3 minutes to ~1 minute, but 
> eventually we encounter an exception as follows:
> (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
> batch from 45 minutes before the exception is thrown.)
> 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
> org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
> StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found 149697756 ms
> at scala.collection.MalLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
> ...
> The Spark code causing the exception is here:
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
>   override def onOutputOperationCompleted(
>   outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
> Unit = synchronized {
> // This method is called before onBatchCompleted
> {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
>   updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
> }
> It seems to me that it may be caused by that batch being removed earlier.
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102
>   override def onBatchCompleted(batchCompleted: 
> StreamingListenerBatchCompleted): Unit = {
> synchronized {
>   waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
>   
> {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
>   val batchUIData = BatchUIData(batchCompleted.batchInfo)
>   completedBatchUIData.enqueue(batchUIData)
>   if (completedBatchUIData.size > batchUIDataLimit) {
> val removedBatch = completedBatchUIData.dequeue()
> batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
>   }
>   totalCompletedBatches += 1L
>   totalProcessedRecords += batchUIData.numRecords
> }
> }
> What is the solution here? Should I make my spark streaming context remember 
> duration a lot longer? ssc.remember(batchDuration * rememberMultiple)
> Otherwise, it seems like there should be some kind of existence check on 
> runningBatchUIData before dereferencing it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict

2017-06-12 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16046649#comment-16046649
 ] 

Dan Dutrow commented on SPARK-21065:


Something to note: If one batch's processing time exceeds the batch interval, 
then a second batch could begin before the first is complete. This is fine 
behavior for us, and offers the ability to catch up if something gets delayed, 
but may be confusing for the scheduler.

> Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
> --
>
> Key: SPARK-21065
> URL: https://issues.apache.org/jira/browse/SPARK-21065
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Scheduler
>Affects Versions: 2.1.0
>Reporter: Dan Dutrow
>
> My streaming application has 200+ output operations, many of them stateful 
> and several of them windowed. In an attempt to reduce the processing times, I 
> set "spark.streaming.concurrentJobs" to 2+. Initial results are very 
> positive, cutting our processing time from ~3 minutes to ~1 minute, but 
> eventually we encounter an exception as follows:
> Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
> batch from 45 minutes before the exception is thrown.
> 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
> org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
> StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found 149697756 ms
> at scala.collection.MalLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
> ...
> The Spark code causing the exception is here:
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
>   override def onOutputOperationCompleted(
>   outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
> Unit = synchronized {
> // This method is called before onBatchCompleted
> {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
>   updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
> }
> It seems to me that it may be caused by that batch being removed earlier.
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102
>   override def onBatchCompleted(batchCompleted: 
> StreamingListenerBatchCompleted): Unit = {
> synchronized {
>   waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
>   
> {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
>   val batchUIData = BatchUIData(batchCompleted.batchInfo)
>   completedBatchUIData.enqueue(batchUIData)
>   if (completedBatchUIData.size > batchUIDataLimit) {
> val removedBatch = completedBatchUIData.dequeue()
> batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
>   }
>   totalCompletedBatches += 1L
>   totalProcessedRecords += batchUIData.numRecords
> }
> }
> What is the solution here? Should I make my spark streaming context remember 
> duration a lot longer? ssc.remember(batchDuration * rememberMultiple)
> Otherwise, it seems like there should be some kind of existence check on 
> runningBatchUIData before dereferencing it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org