Hi colleagues,Hope you are having a great weekend.I get a Kafka configuration 
exception, GroupId not being set,  when trying to run my Beam app (Flink 
runner) in a Flink cluster.

I couldn't find a reference to a method that seta a GroupId  similar to what we 
do for withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords 
etc.The bottom of the stack trace is provide below.How can I set a GroupId 
property for KafkaIO.read()?Thanks for your help.
        ... 25 moreCaused by: org.apache.kafka.common.config.ConfigException: 
Missing required configuration "group.id" which has no default value.        at 
org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)        at 
org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)    
    at 
org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:380)  
      at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)  
      at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)  
      at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)    
    at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)      
  at 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)
        at 
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)
        at 
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)
        ... 27 more

Reply via email to