[ https://issues.apache.org/jira/browse/SPARK-22163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen closed SPARK-22163. ----------------------------- > Design Issue of Spark Streaming that Causes Random Run-time Exception > --------------------------------------------------------------------- > > Key: SPARK-22163 > URL: https://issues.apache.org/jira/browse/SPARK-22163 > Project: Spark > Issue Type: Bug > Components: DStreams, Structured Streaming > Affects Versions: 2.2.0 > Environment: Spark Streaming > Kafka > Linux > Reporter: The Facts > > The application objects can contain List and can be modified dynamically as > well. However, Spark Streaming framework asynchronously serializes the > application's objects as the application runs. Therefore, it causes random > run-time exception on the List when Spark Streaming framework happens to > serializes the application's objects while the application modifies a List in > its own object. > In fact, there are multiple bugs reported about > Caused by: java.util.ConcurrentModificationException > at java.util.ArrayList.writeObject > that are permutation of the same root cause. So the design issue of Spark > streaming framework is that it should do this serialization asynchronously. > Instead, it should either > 1. do this serialization synchronously. This is preferred to eliminate the > issue completely. Or > 2. Allow it to be configured per application whether to do this serialization > synchronously or asynchronously, depending on the nature of each application. > Also, Spark documentation should describe the conditions that trigger Spark > to do this type of serialization asynchronously, so the applications can work > around them until the fix is provided. > === > Vadim Semenov and Steve Loughran, per your inquiries in ticket > https://issues.apache.org/jira/browse/SPARK-21999, I am posting the reply > here because this issue involves Spark's design and not necessarily its code > implementation. > — > My application does not spin up its own thread. All the threads are > controlled by Spark. > Batch interval = 5 seconds > Batch #3 > 1. Driver - Spark Thread #1 - starts batch #3 and blocks until all slave > threads are done with this batch > 2. Slave A - Spark Thread #2. Says it takes 10 seconds to complete > 3. Slave B - Spark Thread #3. Says it takes 1 minutes to complete > 4. Both thread #1 for the driver and thread 2# for Slave A do not jump ahead > and process batch #4. Instead, they wait for thread #3 until it is done. => > So there is already synchronization among the threads within the same batch. > Also, batch to batch is synchronous. > 5. After Spark Thread #3 is done, the driver does other processing to finish > the current batch. In my case, it updates a list of objects. > The above steps repeat for the next batch #4 and subsequent batches. > Based on the exception stack trace, it looks like in step 5, Spark has > another thread #4 that serializes application objects asynchronously. So it > causes random occurrences of ConcurrentModificationException, because the > list of objects is being changed by Spark own thread #1 for the driver. > So the issue is not that my application "is modifying a collection > asynchronously w.r.t. Spark" as Sean kept claiming. Instead, it is Spark's > asynchronous operations among its own different threads within the same batch > that causes this issue. > Since Spark controls all the threads and their synchronization, it is a Spark > design's issue for the lack of synchronization between threads #1 and #4, > that triggers ConcurrentModificationException. That is the root cause of > this issue. > Further, even if the application does not modify its list of objects, in step > 5 the driver could be modifying multiple native objects say two integers. In > thread #1 the driver could have updated integer X and before it could update > integer Y, when Spark's thread #4 asynchronous serializes the application > objects. So the persisted serialized data does not match with the actual > data. This resulted in a permutation of this issue with a false positive > condition where the serialized checkpoint data has partially correct data. > One solution for both issues is to modify Spark's design and allow the > serialization of application objects by Spark's thread #4 to be configurable > per application to be either asynchronous or synchronous with Spark's thread > #1. That way, it is up to individual applications to decide based on the > nature of their business requirements and needed throughput. > === > The code is listed below. Due to the asynchronous nature of Spark's thread > operations and different hardware, the issue relating to this ticket occurs > randomly. So you may need to tweak the batch duration. > package test; > {code} > import java.util.HashMap; > import java.util.HashSet; > import java.util.ArrayList; > import java.util.Arrays; > import java.util.Iterator; > import java.util.List; > import java.util.Map; > import java.util.Set; > import java.util.regex.Pattern; > import scala.Tuple2; > import kafka.serializer.StringDecoder; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaPairRDD; > import org.apache.spark.api.java.function.*; > import org.apache.spark.streaming.api.java.*; > import org.apache.spark.streaming.kafka.KafkaUtils; > import org.apache.spark.streaming.Durations; > /** > * Consumes messages from one or more topics in Kafka and does wordcount. > * Usage: JavaDirectKafkaWordCount <brokers> <topics> > * <brokers> is a list of one or more Kafka brokers > * <topics> is a list of one or more kafka topics to consume from > * > * Example: > * $ bin/run-example streaming.JavaDirectKafkaWordCount > broker1-host:port,broker2-host:port \ > * topic1,topic2 > */ > // , VoidFunction<Iterator<Tuple2<String, Integer>>> > public final class JavaDirectKafkaWordCount_Extended implements > VoidFunction<JavaPairRDD<String, Integer>> { > > private static final Pattern SPACE = Pattern.compile(" "); > > private List<String> appStringList; > public static void main(String[] args) throws Exception { > try { > if (args.length < 2) { > System.err.println("Usage: JavaDirectKafkaWordCount <brokers> > <topics>\n" + > " <brokers> is a list of one or more Kafka brokers\n" + > " <topics> is a list of one or more kafka topics to consume > from\n\n"); > System.exit(1); > } > > JavaDirectKafkaWordCount javaDirectKafkaWordCount = new > JavaDirectKafkaWordCount (); > javaDirectKafkaWordCount.setupStreamApp(args); > } catch (Throwable exc) { > exc.printStackTrace(); > } > } > > private void setupStreamApp (String[] args) throws InterruptedException { > > // StreamingExamples.setStreamingLogLevels(); > String brokers = args[0]; > String topics = args[1]; > // create list of string with dummy values. > appStringList = new ArrayList<>(); > for (int i = 0; i < 1000; ) { > appStringList.add("a-"+ i++); > } > > SparkConf sparkConf = new > SparkConf().setAppName("JavaDirectKafkaWordCount"); > > // Create context with a 2 seconds batch interval > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, > Durations.seconds(10)); > jssc.checkpoint("./test-checkpoint/"); > > Set<String> topicsSet = new > HashSet<>(Arrays.asList(topics.split(","))); > Map<String, String> kafkaParams = new HashMap<>(); > kafkaParams.put("metadata.broker.list", brokers); > > // Create direct kafka stream with brokers and topics > JavaPairInputDStream<String, String> messages = > KafkaUtils.createDirectStream( > jssc, > String.class, > String.class, > StringDecoder.class, > StringDecoder.class, > kafkaParams, > topicsSet > ); > > // Get the lines, split them into words, count the words and print > JavaDStream<String> lines = messages.map(new Function<Tuple2<String, > String>, String>() { > /** > * > */ > private static final long serialVersionUID = 3769940753726592424L; > @Override > public String call(Tuple2<String, String> tuple2) { > return tuple2._2(); > } > }); > > JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, > String>() { > @Override > public Iterator<String> call(String x) { > return Arrays.asList(SPACE.split(x)).iterator(); > } > }); > > JavaPairDStream<String, Integer> wordCounts = words.mapToPair( > new PairFunction<String, String, Integer>() { > @Override > public Tuple2<String, Integer> call(String s) { > return new Tuple2<>(s, 1); > } > }).reduceByKey( > new Function2<Integer, Integer, Integer>() { > @Override > public Integer call(Integer i1, Integer i2) { > return i1 + i2; > } > }); > > wordCounts.foreachRDD(this); > > // Start the computation > jssc.start(); > jssc.awaitTermination(); > } > @Override > public void call(JavaPairRDD<String, Integer> dataStream) throws > Exception { > System.out.println("start foreachRDD"); > > dataStream.foreachPartition(new VoidFunction<Iterator<Tuple2<String, > Integer>>> () { > // Assuming that there are two slave threads, this > foreachPartition code corresponds to Steps 2 and 3 for threads #2 and #3 > // as described in the high-level sequence described of the > textual description above the code > @Override > public void call(Iterator<Tuple2<String, Integer>> tuples) throws > Exception { > if (tuples == null || !tuples.hasNext()) { > return; > } > > while (tuples.hasNext()) { > // The step below is not related to the issue. It is used > just to simulate some operation in the slave threads for completeness > System.out.println(tuples.next()._1); > } > } > }); > /* ===> > * the steps below corresponds to Step 5 of the high-level sequence > and Spark's thread #1 as described of the textual description above the code. > * > * These steps below are where ConcurrentModificationException > occurs randomly as explained in Step 5 of the textual description of this > ticket. > * For the purpose of this test, these steps update the list by > simply rotating the entries. > * > * Based on the stack trace, Spark has another thread, i,e, thread > #4, that asynchronously serializes the application objects during the > * the next three operations. So it would randomly encounters > ConcurrentModificationException because Spark's thread #4 tries to serialize > appStringList > * while Spark's thread #1 is modifying the same list. > */ > String tmp = appStringList.get(0); > appStringList.remove(0); > appStringList.add(tmp); > System.out.println("end foreachRDD"); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org