thanks for all your replies, now I have a complete picture.


2016-12-12 16:49 GMT+01:00 Cody Koeninger <c...@koeninger.org>:

> http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html#creating-a-direct-stream
>
> Use a separate group id for each stream, like the docs say.
>
> If you're doing multiple output operations, and aren't caching, spark
> is going to read from kafka again each time, and if some of those
> reads are happening for the same group and same topicpartition, it's
> not going to work.
>
> On Sun, Dec 11, 2016 at 2:36 PM, Oleksii Dukhno
> <oleksii.duk...@gmail.com> wrote:
> > Hi Anton,
> >
> > What is the command you run your spark app with? Why not working with
> data
> > instead of stream on your second stage operation? Can you provide logs
> with
> > the issue?
> >
> > ConcurrentModificationException is not a spark issue, it means that you
> use
> > the same Kafka consumer instance from more than one thread.
> >
> > Additionally,
> >
> > 1) As I understand new kafka consumer is created every time when you call
> > KafkaUtils.createDirectStream.
> > 2) If you assign the same group id to several consumer instances then all
> > the consumers will get different set of messages on the same topic. This
> is
> > a kind of load balancing which kafka provides with its Consumer API.
> >
> > Oleksii
> >
> > On 11 December 2016 at 18:46, Anton Okolnychyi <
> anton.okolnyc...@gmail.com>
> > wrote:
> >>
> >> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
> >> nothing custom.
> >>
> >>
> >> I will try restate the initial question. Let's consider an example.
> >>
> >> 1. I create a stream and subscribe to a certain topic.
> >>
> >>     val stream = KafkaUtils.createDirectStream(...)
> >>
> >> 2. I extract the actual data from the stream. For instance, word counts.
> >>
> >>     val wordCounts = stream.map(record => (record.value(), 1))
> >>
> >> 3. Then I compute something and output the result to console.
> >>
> >>     val firstResult = stream.reduceByWindow(...)
> >>     firstResult.print()
> >>
> >> Once that is done, I would like to perform another computation on top of
> >> wordCounts and output that result again to console. In my current
> >> understanding, I cannot just reuse wordCounts from Step 2 and should
> create
> >> a new stream with another group id and then define the second
> computation.
> >> Am I correct that if add the next part, then I can get
> >> "ConcurrentModificationException: KafkaConsumer is not safe for
> >> multi-threaded access"?
> >>
> >>     // another computation on wordCounts
> >>     val secondResult = wordCounts.reduceByKeyAndWindow(...)
> >>     secondResult.output()
> >>
> >> Thanks,
> >> Anton
> >>
> >> 2016-12-11 17:11 GMT+01:00 Timur Shenkao <t...@timshenkao.su>:
> >>>
> >>> Hi,
> >>> Usual general questions are:
> >>> -- what is your Spark version?
> >>> -- what is your Kafka version?
> >>> -- do you use "standard" Kafka consumer or try to implement something
> >>> custom (your own multi-threaded consumer)?
> >>>
> >>> The freshest docs
> >>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html
> >>>
> >>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10
> >>> !!!)
> >>>>
> >>>> kafkaParams.put("group.id", "use_a_separate_group_id_for_
> each_stream");
> >>>
> >>>
> >>>
> >>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi
> >>> <anton.okolnyc...@gmail.com> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> I am experimenting with Spark Streaming and Kafka. I will appreciate
> if
> >>>> someone can say whether the following assumption is correct.
> >>>>
> >>>> If I have multiple computations (each with its own output) on one
> stream
> >>>> (created as KafkaUtils.createDirectStream), then there is a chance
> to have
> >>>> ConcurrentModificationException: KafkaConsumer is not safe for
> >>>> multi-threaded access.  To solve this problem, I should create a new
> stream
> >>>> with different "group.id" for each computation.
> >>>>
> >>>> Am I right?
> >>>>
> >>>> Best regards,
> >>>> Anton
> >>>
> >>>
> >>
> >
>

Reply via email to