[ https://issues.apache.org/jira/browse/FLINK-27348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17526185#comment-17526185 ]
Liu commented on FLINK-27348: ----------------------------- Have you committed the kafka offset? > Flink KafkaSource doesn't set groupId > ------------------------------------- > > Key: FLINK-27348 > URL: https://issues.apache.org/jira/browse/FLINK-27348 > Project: Flink > Issue Type: Bug > Components: API / Scala > Affects Versions: 1.14.4 > Environment: OS: windows 8.1. > Java version: > java version "11.0.13" 2021-10-19 LTS > Java(TM) SE Runtime Environment 18.9 (build 11.0.13+10-LTS-370) > Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.13+10-LTS-370, mixed mode) > > > Reporter: Ahmet Gürbüz > Priority: Major > Attachments: image-2022-04-22-05-43-06-475.png, > image-2022-04-22-05-44-56-494.png, image-2022-04-22-05-46-45-592.png, > image-2022-04-22-05-52-04-760.png > > > I have one very simple Flink application. I have installed kafka in my local > and I am reading data from kafka with flink. I am using KafkaSource class in > Flink. Although I have assigned GroupId with setGroupId, this groupId does > not appear in Kafka. > > {code:java} > object FlinkKafkaSource extends App { > val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() > case class Event(partitionNo:Long, eventTime:String, eventTimestamp:Long, > userId:String, firstName:String) > implicit val readsEvent: Reads[Event] = Json.reads[Event] > env > .fromSource(KafkaSource.builder[Event] > .setBootstrapServers("localhost:9092") > .setTopics("flink-connection") > .setGroupId("test-group") // I can't see this groupId in > kafka-consumer-groups > .setStartingOffsets(OffsetsInitializer.latest) > .setDeserializer(new KafkaRecordDeserializationSchema[Event] { > override def deserialize(record: ConsumerRecord[Array[Byte], > Array[Byte]], out: Collector[Event]): Unit = { > val rec = record.value.map(_.toChar).mkString > Try(Json.fromJson[Event](Json.parse(rec)).get) match { > case Success(event) => out.collect(event) > case Failure(exception) => println(s"Couldn't parse string: $rec, > error: ${exception.toString}") > } > } > override def getProducedType: TypeInformation[Event] = > createTypeInformation[Event] > }) > .build, > WatermarkStrategy.noWatermarks[Event], > "kafka-source" > ) > .keyBy(l => l.userId) > .print > env.execute("flink-kafka-source") > } {code} > I have created a topic in kafka named "flink-connection". > > I am using a simple kafka-python producer to produce data flink-connection > topic. > !image-2022-04-22-05-52-04-760.png! > I am able to consume data from kafka to flink. > !image-2022-04-22-05-44-56-494.png! > But can't see the groupId in kafka-consumer-groups > !image-2022-04-22-05-46-45-592.png! > Does anyone has any idea why groupid is not setting? > -- This message was sent by Atlassian Jira (v8.20.7#820007)