Hi colleagues,Hope you are having a great weekend.I get a Kafka configuration
exception, GroupId not being set, when trying to run my Beam app (Flink
runner) in a Flink cluster.
I couldn't find a reference to a method that seta a GroupId similar to what we
do for withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords
etc.The bottom of the stack trace is provide below.How can I set a GroupId
property for KafkaIO.read()?Thanks for your help.
... 25 moreCaused by: org.apache.kafka.common.config.ConfigException:
Missing required configuration "group.id" which has no default value. at
org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) at
org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
at
org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:380)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)
at
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)
at
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)
at
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)
... 27 more