The main issue is why are seeing this exception in the first place. You
don't have to provide a group id.

On Sunday, May 8, 2016, Raghu Angadi <[email protected]> wrote:

> Please see javadic for the method. You can create a map anyway you like.
> Do you need help creating a map in Java?
>
> On Sunday, May 8, 2016, amir bahmanyari <[email protected]
> <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote:
>
>> Hi Raghu,
>> Am using Kafka 0.9.0. Added updateConsumerProperites(*ImmutableMap*.of("
>> group.id", "myGroup").
>> I get *ImmutableMap *not resolved
>>
>> tried many ways to put the right jar to resolve it. Didnt cut it.
>>
>> Thanks.
>> ------------------------------
>>
>> What version of Kafka are you using? KafkaIO does not need you to set a
>> group.id. Not sure why you are seeing this exception.
>>
>> Though KafkaIO does not need it, you can set it:
>>   KafkaIO.read().updateConsumerProperites(ImmutableMap.of("group.id",
>> "temp"));
>>
>>
>>
>> On Sat, May 7, 2016 at 9:34 PM, amir bahmanyari <[email protected]>
>> wrote:
>>
>> Sorry missed the subject! :)
>>
>>
>> ------------------------------
>> *From:* amir bahmanyari <[email protected]>
>> *To:* "[email protected]" <[email protected]>
>> *Sent:* Saturday, May 7, 2016 9:31 PM
>> *Subject:*
>>
>> Hi colleagues,
>> Hope you are having a great weekend.
>> I get a Kafka configuration exception, GroupId not being set,  when
>> trying to run my Beam app (Flink runner) in a Flink cluster.
>>
>>
>> I couldn't find a reference to a method that seta a GroupId  similar to
>> what we do for
>> withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords etc.
>> The bottom of the stack trace is provide below.
>> How can I set a GroupId property for KafkaIO.read()?
>> Thanks for your help.
>>
>>         ... 25 more
>> Caused by:* org.apache.kafka.common.config.ConfigException: Missing
>> required configuration "group.id <http://group.id/>" which has no default
>> value.*
>>         at
>> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
>>         at
>> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
>>         at org.apache.kafka.clients.consumer.*ConsumerConfig*
>> .<init>(ConsumerConfig.java:194)
>>         at org.apache.kafka.clients.consumer.*KafkaConsumer*
>> .<init>(KafkaConsumer.java:380)
>>         at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)
>>         at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)
>>         at org.apache.beam.sdk.io.*kafka.KafkaIO*
>> $Read$1.apply(KafkaIO.java:339)
>>         at
>> org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)
>>         at
>> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)
>>         at
>> org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)
>>         at
>> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)
>>         ... 27 more
>>
>>
>>
>>
>>
>>
>>

Reply via email to