[jira] [Resolved] (SPARK-21999) ConcurrentModificationException - Spark Streaming

2017-10-06 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran resolved SPARK-21999.

Resolution: Won't Fix

> ConcurrentModificationException - Spark Streaming
> -
>
> Key: SPARK-21999
> URL: https://issues.apache.org/jira/browse/SPARK-21999
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Michael N
>Priority: Critical
>
> Hi,
> I am using Spark Streaming v2.1.0 with Kafka 0.8.  I am getting 
> ConcurrentModificationException intermittently.  When it occurs, Spark does 
> not honor the specified value of spark.task.maxFailures. So Spark aborts the 
> current batch  and fetch the next batch, so it results in lost data. Its 
> exception stack is listed below. 
> This instance of ConcurrentModificationException is similar to the issue at 
> https://issues.apache.org/jira/browse/SPARK-17463, which was about 
> Serialization of accumulators in heartbeats.  However, my Spark stream app 
> does not use accumulators. 
> The stack trace listed below occurred on the Spark master in Spark streaming 
> driver at the time of data loss.   
> From the line of code in the first stack trace, can you tell which object 
> Spark was trying to serialize ?  What is the root cause for this issue  ?  
> Because this issue results in lost data as described above, could you have 
> this issue fixed ASAP ?
> Thanks.
> Michael N.,
> 
> Stack trace of Spark Streaming driver
> ERROR JobScheduler:91: Error generating jobs for time 150522493 ms
> org.apache.spark.SparkException: Task not serializable
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>   at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
>   at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>   at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>   at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>   at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 

[jira] [Resolved] (SPARK-21999) ConcurrentModificationException - Spark Streaming

2017-10-04 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-21999.
---
Resolution: Not A Problem

I am not sure what you mean by bold-faced asynchronously. Your app is modifying 
a collection asynchronously w.r.t. Spark. Right? You said as much.
This is why I'm asking you to back up and present a specific example, on the 
mailing list.

The map function is not a flaw, in Spark or Scala or any other functional 
language. That is not the reason flatMap exists. If that's the same 
misunderstanding going on here, then, no, this isn't a problem.


> ConcurrentModificationException - Spark Streaming
> -
>
> Key: SPARK-21999
> URL: https://issues.apache.org/jira/browse/SPARK-21999
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Michael N
>Priority: Critical
>
> Hi,
> I am using Spark Streaming v2.1.0 with Kafka 0.8.  I am getting 
> ConcurrentModificationException intermittently.  When it occurs, Spark does 
> not honor the specified value of spark.task.maxFailures. So Spark aborts the 
> current batch  and fetch the next batch, so it results in lost data. Its 
> exception stack is listed below. 
> This instance of ConcurrentModificationException is similar to the issue at 
> https://issues.apache.org/jira/browse/SPARK-17463, which was about 
> Serialization of accumulators in heartbeats.  However, my Spark stream app 
> does not use accumulators. 
> The stack trace listed below occurred on the Spark master in Spark streaming 
> driver at the time of data loss.   
> From the line of code in the first stack trace, can you tell which object 
> Spark was trying to serialize ?  What is the root cause for this issue  ?  
> Because this issue results in lost data as described above, could you have 
> this issue fixed ASAP ?
> Thanks.
> Michael N.,
> 
> Stack trace of Spark Streaming driver
> ERROR JobScheduler:91: Error generating jobs for time 150522493 ms
> org.apache.spark.SparkException: Task not serializable
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>   at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
>   at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>   at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>   at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>   at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 

[jira] [Resolved] (SPARK-21999) ConcurrentModificationException - Spark Streaming

2017-09-28 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-21999.
---
Resolution: Not A Problem

> ConcurrentModificationException - Spark Streaming
> -
>
> Key: SPARK-21999
> URL: https://issues.apache.org/jira/browse/SPARK-21999
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Michael N
>Priority: Critical
>
> Hi,
> I am using Spark Streaming v2.1.0 with Kafka 0.8.  I am getting 
> ConcurrentModificationException intermittently.  When it occurs, Spark does 
> not honor the specified value of spark.task.maxFailures. So Spark aborts the 
> current batch  and fetch the next batch, so it results in lost data. Its 
> exception stack is listed below. 
> This instance of ConcurrentModificationException is similar to the issue at 
> https://issues.apache.org/jira/browse/SPARK-17463, which was about 
> Serialization of accumulators in heartbeats.  However, my Spark stream app 
> does not use accumulators. 
> The stack trace listed below occurred on the Spark master in Spark streaming 
> driver at the time of data loss.   
> From the line of code in the first stack trace, can you tell which object 
> Spark was trying to serialize ?  What is the root cause for this issue  ?  
> Because this issue results in lost data as described above, could you have 
> this issue fixed ASAP ?
> Thanks.
> Michael N.,
> 
> Stack trace of Spark Streaming driver
> ERROR JobScheduler:91: Error generating jobs for time 150522493 ms
> org.apache.spark.SparkException: Task not serializable
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>   at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
>   at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>   at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>   at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>   at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 
>