Please see javadic for the method. You can create a map anyway you like. Do you need help creating a map in Java?
On Sunday, May 8, 2016, amir bahmanyari <[email protected]> wrote: > Hi Raghu, > Am using Kafka 0.9.0. Added updateConsumerProperites(*ImmutableMap*.of(" > group.id", "myGroup"). > I get *ImmutableMap *not resolved > > tried many ways to put the right jar to resolve it. Didnt cut it. > > Thanks. > ------------------------------ > > What version of Kafka are you using? KafkaIO does not need you to set a > group.id. Not sure why you are seeing this exception. > > Though KafkaIO does not need it, you can set it: > KafkaIO.read().updateConsumerProperites(ImmutableMap.of("group.id", > "temp")); > > > > On Sat, May 7, 2016 at 9:34 PM, amir bahmanyari <[email protected] > <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote: > > Sorry missed the subject! :) > > > ------------------------------ > *From:* amir bahmanyari <[email protected] > <javascript:_e(%7B%7D,'cvml','[email protected]');>> > *To:* "[email protected] > <javascript:_e(%7B%7D,'cvml','[email protected]');>" < > [email protected] > <javascript:_e(%7B%7D,'cvml','[email protected]');>> > *Sent:* Saturday, May 7, 2016 9:31 PM > *Subject:* > > Hi colleagues, > Hope you are having a great weekend. > I get a Kafka configuration exception, GroupId not being set, when trying > to run my Beam app (Flink runner) in a Flink cluster. > > > I couldn't find a reference to a method that seta a GroupId similar to > what we do for > withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords etc. > The bottom of the stack trace is provide below. > How can I set a GroupId property for KafkaIO.read()? > Thanks for your help. > > ... 25 more > Caused by:* org.apache.kafka.common.config.ConfigException: Missing > required configuration "group.id <http://group.id/>" which has no default > value.* > at > org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) > at > org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48) > at org.apache.kafka.clients.consumer.*ConsumerConfig* > .<init>(ConsumerConfig.java:194) > at org.apache.kafka.clients.consumer.*KafkaConsumer* > .<init>(KafkaConsumer.java:380) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350) > at org.apache.beam.sdk.io.*kafka.KafkaIO* > $Read$1.apply(KafkaIO.java:339) > at > org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337) > at > org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572) > at > org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165) > at > org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101) > ... 27 more > > > > > > >
