[ 
https://issues.apache.org/jira/browse/SPARK-22163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199871#comment-16199871
 ] 

Hyukjin Kwon commented on SPARK-22163:
--------------------------------------

Please stop doing this in JIRA ... I think 
https://issues.apache.org/jira/browse/SPARK-22163?focusedCommentId=16195830&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16195830
 explains enough. 

> Design Issue of Spark Streaming that Causes Random Run-time Exception
> ---------------------------------------------------------------------
>
>                 Key: SPARK-22163
>                 URL: https://issues.apache.org/jira/browse/SPARK-22163
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: Spark Streaming
> Kafka
> Linux
>            Reporter: Michael N
>
> The application objects can contain List and can be modified dynamically as 
> well.   However, Spark Streaming framework asynchronously serializes the 
> application's objects as the application runs.  Therefore, it causes random 
> run-time exception on the List when Spark Streaming framework happens to 
> serializes the application's objects while the application modifies a List in 
> its own object.  
> In fact, there are multiple bugs reported about
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject
> that are permutation of the same root cause. So the design issue of Spark 
> streaming framework is that it should do this serialization asynchronously.  
> Instead, it should either
> 1. do this serialization synchronously. This is preferred to eliminate the 
> issue completely.  Or
> 2. Allow it to be configured per application whether to do this serialization 
> synchronously or asynchronously, depending on the nature of each application.
> Also, Spark documentation should describe the conditions that trigger Spark 
> to do this type of serialization asynchronously, so the applications can work 
> around them until the fix is provided. 
> ===
> Vadim Semenov and Steve Loughran, per your inquiries in ticket 
> https://issues.apache.org/jira/browse/SPARK-21999, I am posting the reply 
> here because this issue involves Spark's design and not necessarily its code 
> implementation.
> —
> My application does not spin up its own thread. All the threads are 
> controlled by Spark.
> Batch interval = 5 seconds
> Batch #3
> 1. Driver - Spark Thread #1 - starts batch #3 and blocks until all slave 
> threads are done with this batch
> 2. Slave A - Spark Thread #2. Says it takes 10 seconds to complete
> 3. Slave B - Spark Thread #3. Says it takes 1 minutes to complete
> 4. Both thread #1 for the driver and thread 2# for Slave A do not jump ahead 
> and process batch #4. Instead, they wait for thread #3 until it is done. => 
> So there is already synchronization among the threads within the same batch. 
> Also, batch to batch is synchronous.
> 5. After Spark Thread #3 is done, the driver does other processing to finish 
> the current batch. In my case, it updates a list of objects.
> The above steps repeat for the next batch #4 and subsequent batches.
> Based on the exception stack trace, it looks like in step 5, Spark has 
> another thread #4 that serializes application objects asynchronously. So it 
> causes random occurrences of ConcurrentModificationException, because the 
> list of objects is being changed by Spark own thread #1 for the driver.
> So the issue is not that my application "is modifying a collection 
> asynchronously w.r.t. Spark" as Sean kept claiming. Instead, it is Spark's 
> asynchronous operations among its own different threads within the same batch 
> that causes this issue.
> Since Spark controls all the threads and their synchronization, it is a Spark 
> design's issue for the lack of synchronization between threads #1 and #4, 
> that triggers ConcurrentModificationException.  That is the root cause of 
> this issue.
> Further, even if the application does not modify its list of objects, in step 
> 5 the driver could be modifying multiple native objects say two integers. In 
> thread #1 the driver could have updated integer X and before it could update 
> integer Y, when Spark's thread #4 asynchronous serializes the application 
> objects. So the persisted serialized data does not match with the actual 
> data. This resulted in a permutation of this issue with a false positive 
> condition where the serialized checkpoint data has partially correct data.
> One solution for both issues is to modify Spark's design and allow the 
> serialization of application objects by Spark's thread #4 to be configurable 
> per application to be either asynchronous or synchronous with Spark's thread 
> #1. That way, it is up to individual applications to decide based on the 
> nature of their business requirements and needed throughput.
> ===
> The code is listed below. Due to the asynchronous nature of Spark's thread 
> operations and different hardware, the issue relating to this ticket occurs 
> randomly. So you may need to tweak the batch duration.
> package test;
> {code}
> import java.util.HashMap;
> import java.util.HashSet;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.Iterator;
> import java.util.List;
> import java.util.Map;
> import java.util.Set;
> import java.util.regex.Pattern;
> import scala.Tuple2;
> import kafka.serializer.StringDecoder;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.function.*;
> import org.apache.spark.streaming.api.java.*;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import org.apache.spark.streaming.Durations;
> /**
>  * Consumes messages from one or more topics in Kafka and does wordcount.
>  * Usage: JavaDirectKafkaWordCount <brokers> <topics>
>  *   <brokers> is a list of one or more Kafka brokers
>  *   <topics> is a list of one or more kafka topics to consume from
>  *
>  * Example:
>  *    $ bin/run-example streaming.JavaDirectKafkaWordCount 
> broker1-host:port,broker2-host:port \
>  *      topic1,topic2
>  */
> //  ,    VoidFunction<Iterator<Tuple2<String, Integer>>>
> public final class JavaDirectKafkaWordCount_Extended implements 
> VoidFunction<JavaPairRDD<String, Integer>> {
>    
>   private static final Pattern SPACE = Pattern.compile(" ");
>  
>   private List<String> appStringList;
>   public static void main(String[] args) throws Exception {
>       try {
>         if (args.length < 2) {
>             System.err.println("Usage: JavaDirectKafkaWordCount <brokers> 
> <topics>\n" +
>                 "  <brokers> is a list of one or more Kafka brokers\n" +
>                 "  <topics> is a list of one or more kafka topics to consume 
> from\n\n");
>             System.exit(1);           
>           }
>        
>         JavaDirectKafkaWordCount javaDirectKafkaWordCount = new 
> JavaDirectKafkaWordCount ();
>         javaDirectKafkaWordCount.setupStreamApp(args);
>       } catch (Throwable exc) {
>           exc.printStackTrace();
>       }
>   }
>  
>   private void setupStreamApp (String[] args) throws InterruptedException {   
>   
>     // StreamingExamples.setStreamingLogLevels();
>         String brokers = args[0];
>         String topics = args[1];
>         // create list of string with dummy values.
>         appStringList = new ArrayList<>();
>         for (int i = 0; i < 1000; ) {
>             appStringList.add("a-"+ i++);           
>         }
>    
>         SparkConf sparkConf = new 
> SparkConf().setAppName("JavaDirectKafkaWordCount");
>        
>         // Create context with a 2 seconds batch interval
>         JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
> Durations.seconds(10));
>         jssc.checkpoint("./test-checkpoint/");
>    
>         Set<String> topicsSet = new 
> HashSet<>(Arrays.asList(topics.split(",")));
>         Map<String, String> kafkaParams = new HashMap<>();
>         kafkaParams.put("metadata.broker.list", brokers);
>    
>         // Create direct kafka stream with brokers and topics
>         JavaPairInputDStream<String, String> messages = 
> KafkaUtils.createDirectStream(
>             jssc,
>             String.class,
>             String.class,
>             StringDecoder.class,
>             StringDecoder.class,
>             kafkaParams,
>             topicsSet
>         );
>    
>         // Get the lines, split them into words, count the words and print
>         JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
> String>, String>() {
>           /**
>              *
>              */
>             private static final long serialVersionUID = 3769940753726592424L;
>             @Override
>               public String call(Tuple2<String, String> tuple2) {
>                 return tuple2._2();
>               }
>         });
>        
>         JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
> String>() {
>               @Override
>               public Iterator<String> call(String x) {
>                 return Arrays.asList(SPACE.split(x)).iterator();
>               }
>         });
>        
>         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>               new PairFunction<String, String, Integer>() {
>                 @Override
>                 public Tuple2<String, Integer> call(String s) {
>                   return new Tuple2<>(s, 1);
>                 }
>               }).reduceByKey(
>                 new Function2<Integer, Integer, Integer>() {
>                 @Override
>                 public Integer call(Integer i1, Integer i2) {
>                   return i1 + i2;
>                 }
>         });
>        
>         wordCounts.foreachRDD(this);
>                
>         // Start the computation
>         jssc.start();
>         jssc.awaitTermination();
>     }
>     @Override
>     public void call(JavaPairRDD<String, Integer> dataStream) throws 
> Exception {
>         System.out.println("start foreachRDD");
>        
>         dataStream.foreachPartition(new VoidFunction<Iterator<Tuple2<String, 
> Integer>>> () {
>             // Assuming that there are two slave threads, this 
> foreachPartition code corresponds to Steps 2 and 3 for threads #2 and #3
>             // as described in the high-level sequence described of the 
> textual description above the code
>             @Override
>             public void call(Iterator<Tuple2<String, Integer>> tuples) throws 
> Exception {
>                 if (tuples == null || !tuples.hasNext()) {
>                     return;
>                 }
>                
>                 while (tuples.hasNext()) {
>                     // The step below is not related to the issue. It is used 
> just to simulate some operation in the slave threads for completeness
>                     System.out.println(tuples.next()._1);
>                 }                   
>             }               
>         });           
>          /* ===> 
>           *  the steps below corresponds to Step 5 of the high-level sequence 
> and Spark's thread #1 as described of the textual description above the code.
>           * 
>           *  These steps below are where ConcurrentModificationException 
> occurs randomly as explained in Step 5 of the textual description of this 
> ticket.
>           *  For the purpose of this test, these steps update the list by 
> simply rotating the entries.
>          *
>          * Based on the stack trace, Spark has another thread, i,e, thread 
> #4, that asynchronously serializes the application objects during the
>          * the next three operations. So it would randomly encounters 
> ConcurrentModificationException  because Spark's thread #4 tries to serialize 
> appStringList
>          * while Spark's thread #1 is modifying the same list.
>          */
>         String tmp = appStringList.get(0);
>         appStringList.remove(0);
>         appStringList.add(tmp);
>         System.out.println("end foreachRDD");
>     }   
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to