sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and nothing custom.
I will try restate the initial question. Let's consider an example. 1. I create a stream and subscribe to a certain topic. val stream = KafkaUtils.createDirectStream(...) 2. I extract the actual data from the stream. For instance, word counts. val wordCounts = stream.map(record => (record.value(), 1)) 3. Then I compute something and output the result to console. val firstResult = stream.reduceByWindow(...) firstResult.print() Once that is done, I would like to perform another computation on top of wordCounts and output that result again to console. In my current understanding, I cannot just reuse wordCounts from Step 2 and should create a new stream with another group id and then define the second computation. Am I correct that if add the next part, then I can get " ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access"? // another computation on wordCounts val secondResult = wordCounts.reduceByKeyAndWindow(...) secondResult.output() Thanks, Anton 2016-12-11 17:11 GMT+01:00 Timur Shenkao <t...@timshenkao.su>: > Hi, > Usual general questions are: > -- what is your Spark version? > -- what is your Kafka version? > -- do you use "standard" Kafka consumer or try to implement something > custom (your own multi-threaded consumer)? > > The freshest docs https://spark.apache.org/docs/ > latest/streaming-kafka-0-10-integration.html > > AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10 !!!) > >> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); >> >> > > On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi < > anton.okolnyc...@gmail.com> wrote: > >> Hi, >> >> I am experimenting with Spark Streaming and Kafka. I will appreciate if >> someone can say whether the following assumption is correct. >> >> If I have multiple computations (each with its own output) on one stream >> (created as KafkaUtils.createDirectStream), then there is a chance to >> have ConcurrentModificationException: KafkaConsumer is not safe for >> multi-threaded access. To solve this problem, I should create a new stream >> with different "group.id" for each computation. >> >> Am I right? >> >> Best regards, >> Anton >> > >