What version of Kafka are you using? KafkaIO does not need you to set a
group.id. Not sure why you are seeing this exception.
Though KafkaIO does not need it, you can set it:
KafkaIO.read().updateConsumerProperites(ImmutableMap.of("group.id",
"temp"));
On Sat, May 7, 2016 at 9:34 PM, amir bahmanyari <[email protected]> wrote:
> Sorry missed the subject! :)
>
>
> ------------------------------
> *From:* amir bahmanyari <[email protected]>
> *To:* "[email protected]" <[email protected]>
> *Sent:* Saturday, May 7, 2016 9:31 PM
> *Subject:*
>
> Hi colleagues,
> Hope you are having a great weekend.
> I get a Kafka configuration exception, GroupId not being set, when trying
> to run my Beam app (Flink runner) in a Flink cluster.
>
>
> I couldn't find a reference to a method that seta a GroupId similar to
> what we do for
> withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords etc.
> The bottom of the stack trace is provide below.
> How can I set a GroupId property for KafkaIO.read()?
> Thanks for your help.
>
> ... 25 more
> Caused by:* org.apache.kafka.common.config.ConfigException: Missing
> required configuration "group.id <http://group.id>" which has no default
> value.*
> at
> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
> at
> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
> at org.apache.kafka.clients.consumer.*ConsumerConfig*
> .<init>(ConsumerConfig.java:194)
> at org.apache.kafka.clients.consumer.*KafkaConsumer*
> .<init>(KafkaConsumer.java:380)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)
> at org.apache.beam.sdk.io.*kafka.KafkaIO*
> $Read$1.apply(KafkaIO.java:339)
> at
> org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)
> at
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)
> at
> org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)
> at
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)
> ... 27 more
>
>
>
>