The main issue is why are seeing this exception in the first place. You don't have to provide a group id.
On Sunday, May 8, 2016, Raghu Angadi <[email protected]> wrote: > Please see javadic for the method. You can create a map anyway you like. > Do you need help creating a map in Java? > > On Sunday, May 8, 2016, amir bahmanyari <[email protected] > <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote: > >> Hi Raghu, >> Am using Kafka 0.9.0. Added updateConsumerProperites(*ImmutableMap*.of(" >> group.id", "myGroup"). >> I get *ImmutableMap *not resolved >> >> tried many ways to put the right jar to resolve it. Didnt cut it. >> >> Thanks. >> ------------------------------ >> >> What version of Kafka are you using? KafkaIO does not need you to set a >> group.id. Not sure why you are seeing this exception. >> >> Though KafkaIO does not need it, you can set it: >> KafkaIO.read().updateConsumerProperites(ImmutableMap.of("group.id", >> "temp")); >> >> >> >> On Sat, May 7, 2016 at 9:34 PM, amir bahmanyari <[email protected]> >> wrote: >> >> Sorry missed the subject! :) >> >> >> ------------------------------ >> *From:* amir bahmanyari <[email protected]> >> *To:* "[email protected]" <[email protected]> >> *Sent:* Saturday, May 7, 2016 9:31 PM >> *Subject:* >> >> Hi colleagues, >> Hope you are having a great weekend. >> I get a Kafka configuration exception, GroupId not being set, when >> trying to run my Beam app (Flink runner) in a Flink cluster. >> >> >> I couldn't find a reference to a method that seta a GroupId similar to >> what we do for >> withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords etc. >> The bottom of the stack trace is provide below. >> How can I set a GroupId property for KafkaIO.read()? >> Thanks for your help. >> >> ... 25 more >> Caused by:* org.apache.kafka.common.config.ConfigException: Missing >> required configuration "group.id <http://group.id/>" which has no default >> value.* >> at >> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) >> at >> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48) >> at org.apache.kafka.clients.consumer.*ConsumerConfig* >> .<init>(ConsumerConfig.java:194) >> at org.apache.kafka.clients.consumer.*KafkaConsumer* >> .<init>(KafkaConsumer.java:380) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350) >> at org.apache.beam.sdk.io.*kafka.KafkaIO* >> $Read$1.apply(KafkaIO.java:339) >> at >> org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337) >> at >> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572) >> at >> org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165) >> at >> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101) >> ... 27 more >> >> >> >> >> >> >>
