[ 
https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195810#comment-16195810
 ] 

Michael N commented on SPARK-21999:
-----------------------------------

Steve, your claim that I told the project "their design is wrong" is factually 
incorrect. If you read this ticket completely, you'd see that I first posted 
questions to get more insights into the reasons for the current design. Those 
questions are re-listed below. Yet Sean just closed the ticket. If he did not 
have the answers to those questions, he should have let someone else who knows 
the answers post them instead.

Some related questions that would provide insights into this issue are

1. Based on the stack trace, what triggered Spark to do serialization of the 
application objects on the master node ?

2. Why does Spark do this type of serialization asynchronously ?

3. What other conditions trigger Spark to do this type of serialization 
asynchronously ?

4. Can it be configured to do this type of serialization synchronously instead ?

5. Excluding application code, what list objects does Spark use that would be 
part of this type of serialization ?

---

1. In the first place, why does Spark serialize the application objects 
asynchronously while the streaming application is running continuously from 
batch to batch ?

2. If Spark needs to do this type of serialization at all, why does it not do 
at the end of the batch ?



> ConcurrentModificationException - Spark Streaming
> -------------------------------------------------
>
>                 Key: SPARK-21999
>                 URL: https://issues.apache.org/jira/browse/SPARK-21999
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Michael N
>            Priority: Critical
>
> Hi,
> I am using Spark Streaming v2.1.0 with Kafka 0.8.  I am getting 
> ConcurrentModificationException intermittently.  When it occurs, Spark does 
> not honor the specified value of spark.task.maxFailures. So Spark aborts the 
> current batch  and fetch the next batch, so it results in lost data. Its 
> exception stack is listed below. 
> This instance of ConcurrentModificationException is similar to the issue at 
> https://issues.apache.org/jira/browse/SPARK-17463, which was about 
> Serialization of accumulators in heartbeats.  However, my Spark stream app 
> does not use accumulators. 
> The stack trace listed below occurred on the Spark master in Spark streaming 
> driver at the time of data loss.   
> From the line of code in the first stack trace, can you tell which object 
> Spark was trying to serialize ?  What is the root cause for this issue  ?  
> Because this issue results in lost data as described above, could you have 
> this issue fixed ASAP ?
> Thanks.
> Michael N.,
> ----------------
> Stack trace of Spark Streaming driver
> ERROR JobScheduler:91: Error generating jobs for time 1505224930000 ms
> org.apache.spark.SparkException: Task not serializable
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>       at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>       at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
>       at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>       at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>       at scala.Option.map(Option.scala:146)
>       at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.immutable.List.map(List.scala:285)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>       at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>       at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>       at scala.util.Try$.apply(Try.scala:192)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: java.util.ConcurrentModificationException
>       at java.util.ArrayList.writeObject(ArrayList.java:766)
>       at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>       at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
>       ... 60 more
> 2017-09-12 07:02:10.029 ERROR 
> org.apache.spark.SparkException: Task not serializable
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>       at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>       at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
>       at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>       at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>       at scala.Option.map(Option.scala:146)
>       at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.immutable.List.map(List.scala:285)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>       at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>       at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>       at scala.util.Try$.apply(Try.scala:192)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: java.util.ConcurrentModificationException
>       at java.util.ArrayList.writeObject(ArrayList.java:766)
>       at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>       at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
>       ... 60 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to