jingzi created FLINK-25344: ------------------------------ Summary: flink kafka connector Property group.id is required when using committed offset for offsets initializer Key: FLINK-25344 URL: https://issues.apache.org/jira/browse/FLINK-25344 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.0 Reporter: jingzi
Create a Kafka table: ``` {{CREATE TABLE KafkaTable ( }} {{`user_id` BIGINT,`item_id` BIGINT,}} {{ `behavior` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH ('connector' = 'kafka',}} {{ 'topic' = 'user_behavior',}} {{ 'properties.bootstrap.servers' = 'localhost:9092',}} {{'format' = 'csv')}} ``` Remark:The ```{{{}properties.group.id{}}}``` is unset , Execute a query: ```select * from {{KafkaTable;```}} {{}} Log is : ``` java.lang.IllegalStateException: Property group.id is required when using committed offset for offsets initializer at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) at org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.validate(ReaderHandledOffsetsInitializer.java:76) at org.apache.flink.connector.kafka.source.KafkaSourceBuilder.sanityCheck(KafkaSourceBuilder.java:501) at org.apache.flink.connector.kafka.source.KafkaSourceBuilder.build(KafkaSourceBuilder.java:403) at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createKafkaSource(KafkaDynamicSource.java:416) at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:216) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:684) at com.chehejia.dmp.JobApplication.kafkaGroupOption(JobApplication.java:686) at com.chehejia.dmp.JobApplication.explainDmlForOption(JobApplication.java:458) at com.chehejia.dmp.JobApplication.main(JobApplication.java:120) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) ``` -- This message was sent by Atlassian Jira (v8.20.1#820001)