sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
nothing custom.


I will try restate the initial question. Let's consider an example.

1. I create a stream and subscribe to a certain topic.

    val stream = KafkaUtils.createDirectStream(...)

2. I extract the actual data from the stream. For instance, word counts.

    val wordCounts = stream.map(record => (record.value(), 1))

3. Then I compute something and output the result to console.

    val firstResult = stream.reduceByWindow(...)
    firstResult.print()

Once that is done, I would like to perform another computation on top of
wordCounts and output that result again to console. In my current
understanding, I cannot just reuse wordCounts from Step 2 and should create
a new stream with another group id and then define the second computation.
Am I correct that if add the next part, then I can get "
ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access"?

    // another computation on wordCounts
    val secondResult = wordCounts.reduceByKeyAndWindow(...)
    secondResult.output()

Thanks,
Anton

2016-12-11 17:11 GMT+01:00 Timur Shenkao <t...@timshenkao.su>:

> Hi,
> Usual general questions are:
> -- what is your Spark version?
> -- what is your Kafka version?
> -- do you use "standard" Kafka consumer or try to implement something
> custom (your own multi-threaded consumer)?
>
> The freshest docs https://spark.apache.org/docs/
> latest/streaming-kafka-0-10-integration.html
>
> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10 !!!)
>
>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>>
>>
>
> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi <
> anton.okolnyc...@gmail.com> wrote:
>
>> Hi,
>>
>> I am experimenting with Spark Streaming and Kafka. I will appreciate if
>> someone can say whether the following assumption is correct.
>>
>> If I have multiple computations (each with its own output) on one stream
>> (created as KafkaUtils.createDirectStream), then there is a chance to
>> have ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access.  To solve this problem, I should create a new stream
>> with different "group.id" for each computation.
>>
>> Am I right?
>>
>> Best regards,
>> Anton
>>
>
>

Reply via email to