thanks for all your replies, now I have a complete picture.
2016-12-12 16:49 GMT+01:00 Cody Koeninger <c...@koeninger.org>: > http://spark.apache.org/docs/latest/streaming-kafka-0-10- > integration.html#creating-a-direct-stream > > Use a separate group id for each stream, like the docs say. > > If you're doing multiple output operations, and aren't caching, spark > is going to read from kafka again each time, and if some of those > reads are happening for the same group and same topicpartition, it's > not going to work. > > On Sun, Dec 11, 2016 at 2:36 PM, Oleksii Dukhno > <oleksii.duk...@gmail.com> wrote: > > Hi Anton, > > > > What is the command you run your spark app with? Why not working with > data > > instead of stream on your second stage operation? Can you provide logs > with > > the issue? > > > > ConcurrentModificationException is not a spark issue, it means that you > use > > the same Kafka consumer instance from more than one thread. > > > > Additionally, > > > > 1) As I understand new kafka consumer is created every time when you call > > KafkaUtils.createDirectStream. > > 2) If you assign the same group id to several consumer instances then all > > the consumers will get different set of messages on the same topic. This > is > > a kind of load balancing which kafka provides with its Consumer API. > > > > Oleksii > > > > On 11 December 2016 at 18:46, Anton Okolnychyi < > anton.okolnyc...@gmail.com> > > wrote: > >> > >> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and > >> nothing custom. > >> > >> > >> I will try restate the initial question. Let's consider an example. > >> > >> 1. I create a stream and subscribe to a certain topic. > >> > >> val stream = KafkaUtils.createDirectStream(...) > >> > >> 2. I extract the actual data from the stream. For instance, word counts. > >> > >> val wordCounts = stream.map(record => (record.value(), 1)) > >> > >> 3. Then I compute something and output the result to console. > >> > >> val firstResult = stream.reduceByWindow(...) > >> firstResult.print() > >> > >> Once that is done, I would like to perform another computation on top of > >> wordCounts and output that result again to console. In my current > >> understanding, I cannot just reuse wordCounts from Step 2 and should > create > >> a new stream with another group id and then define the second > computation. > >> Am I correct that if add the next part, then I can get > >> "ConcurrentModificationException: KafkaConsumer is not safe for > >> multi-threaded access"? > >> > >> // another computation on wordCounts > >> val secondResult = wordCounts.reduceByKeyAndWindow(...) > >> secondResult.output() > >> > >> Thanks, > >> Anton > >> > >> 2016-12-11 17:11 GMT+01:00 Timur Shenkao <t...@timshenkao.su>: > >>> > >>> Hi, > >>> Usual general questions are: > >>> -- what is your Spark version? > >>> -- what is your Kafka version? > >>> -- do you use "standard" Kafka consumer or try to implement something > >>> custom (your own multi-threaded consumer)? > >>> > >>> The freshest docs > >>> https://spark.apache.org/docs/latest/streaming-kafka-0-10- > integration.html > >>> > >>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10 > >>> !!!) > >>>> > >>>> kafkaParams.put("group.id", "use_a_separate_group_id_for_ > each_stream"); > >>> > >>> > >>> > >>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi > >>> <anton.okolnyc...@gmail.com> wrote: > >>>> > >>>> Hi, > >>>> > >>>> I am experimenting with Spark Streaming and Kafka. I will appreciate > if > >>>> someone can say whether the following assumption is correct. > >>>> > >>>> If I have multiple computations (each with its own output) on one > stream > >>>> (created as KafkaUtils.createDirectStream), then there is a chance > to have > >>>> ConcurrentModificationException: KafkaConsumer is not safe for > >>>> multi-threaded access. To solve this problem, I should create a new > stream > >>>> with different "group.id" for each computation. > >>>> > >>>> Am I right? > >>>> > >>>> Best regards, > >>>> Anton > >>> > >>> > >> > > >