[ 
https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16192135#comment-16192135
 ] 

Michael N edited comment on SPARK-21999 at 10/4/17 10:32 PM:
-------------------------------------------------------------

The posted questions were intended to show why this is a design issue. You have 
not been able to provide the answers. So before you respond further and claim 
otherwise, please re-read the posted questions and provide the answers to those 
questions first.

1. In the first place, why does Spark serialize the application objects 
***asynchronously*** while the streaming application is running continuously 
from batch to batch ?

2. If Spark needs to do this type of serialization at all, why does it not do 
at the end of the batch ***synchronously*** ?

Here is the analogy to give more guidance as to why this is a design flaw. The 
older Spark's map framework has a major design flaw, where it makes a function 
call for every single object. its code implementation matched with its design, 
but its design flaw is that it has massive overhead when there are millions and 
billions of objects, where it needs to make the same function millions and 
billions times for every single object. 

On the other hand, the newer flatMap framework make one function call for a 
list of objects via the Iterator.



was (Author: michaeln_apache):
The posted questions were intended to show why this is a design issue. You have 
not been able to provide the answers. So before you respond further and claim 
otherwise, please re-read the posted questions and provide the answers to those 
questions first.

1. In the first place, why does Spark serialize the application objects 
***asynchronously*** while the streaming application is running continuously 
from batch to batch ?

2. If Spark needs to do this type of serialization at all, why does it not do 
at the end of the batch ***synchronously*** ?

Here is the analogy to give more guidance as to why this is a design flaw. The 
older Spark's map framework has a major design flaw, where it makes a function 
call for every single object. its code implementation matched with its design, 
but its design flaw is that it has massive overhead when there are millions and 
billions of objects. On the other hand, the newer flatMap framework make one 
function call for a list of objects via the Iterator.


> ConcurrentModificationException - Spark Streaming
> -------------------------------------------------
>
>                 Key: SPARK-21999
>                 URL: https://issues.apache.org/jira/browse/SPARK-21999
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Michael N
>            Priority: Critical
>
> Hi,
> I am using Spark Streaming v2.1.0 with Kafka 0.8.  I am getting 
> ConcurrentModificationException intermittently.  When it occurs, Spark does 
> not honor the specified value of spark.task.maxFailures. So Spark aborts the 
> current batch  and fetch the next batch, so it results in lost data. Its 
> exception stack is listed below. 
> This instance of ConcurrentModificationException is similar to the issue at 
> https://issues.apache.org/jira/browse/SPARK-17463, which was about 
> Serialization of accumulators in heartbeats.  However, my Spark stream app 
> does not use accumulators. 
> The stack trace listed below occurred on the Spark master in Spark streaming 
> driver at the time of data loss.   
> From the line of code in the first stack trace, can you tell which object 
> Spark was trying to serialize ?  What is the root cause for this issue  ?  
> Because this issue results in lost data as described above, could you have 
> this issue fixed ASAP ?
> Thanks.
> Michael N.,
> ----------------
> Stack trace of Spark Streaming driver
> ERROR JobScheduler:91: Error generating jobs for time 1505224930000 ms
> org.apache.spark.SparkException: Task not serializable
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>       at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>       at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
>       at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>       at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>       at scala.Option.map(Option.scala:146)
>       at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.immutable.List.map(List.scala:285)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>       at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>       at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>       at scala.util.Try$.apply(Try.scala:192)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: java.util.ConcurrentModificationException
>       at java.util.ArrayList.writeObject(ArrayList.java:766)
>       at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>       at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
>       ... 60 more
> 2017-09-12 07:02:10.029 ERROR 
> org.apache.spark.SparkException: Task not serializable
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>       at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>       at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
>       at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>       at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>       at scala.Option.map(Option.scala:146)
>       at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.immutable.List.map(List.scala:285)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>       at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>       at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>       at scala.util.Try$.apply(Try.scala:192)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: java.util.ConcurrentModificationException
>       at java.util.ArrayList.writeObject(ArrayList.java:766)
>       at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>       at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
>       ... 60 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to