[ https://issues.apache.org/jira/browse/FLINK-30922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17685762#comment-17685762 ]
Shuiqiang Chen commented on FLINK-30922: ---------------------------------------- Hi [~tanjialiang], thank you for reporting the issue. I have reproduced the same error with the code you provided. According to [FLIP-107|https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors], it is possible to write metadata columns in SQL. The cause of this error is a bug that it excludes all computed columns and metadata columns when doing appendPartitionAndNullsProjects in PreValidateReWriter. Actually, it is expected to include all persisted columns. I would like to fix it. > SQL validate fail in parsing writable metadata > ---------------------------------------------- > > Key: FLINK-30922 > URL: https://issues.apache.org/jira/browse/FLINK-30922 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.16.1 > Reporter: tanjialiang > Priority: Major > > When i tried an simple demo sql with writing metadata to the kafka in flink > sql client > {code:java} > CREATE TABLE KafkaTable ( > `user_id` BIGINT, > `item_id` BIGINT, > `behavior` STRING, > `ts` TIMESTAMP(3) METADATA FROM 'timestamp' > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'csv' > ) > INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code} > > it will be throw an error > {code:java} > org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse > statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown > Source) ~[?:?] > at > org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) > [flink-sql-client-1.16.1.jar:1.16.1] > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) > [flink-sql-client-1.16.1.jar:1.16.1] > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) > [flink-sql-client-1.16.1.jar:1.16.1] > Caused by: org.apache.flink.table.api.ValidationException: SQL validation > failed. From line 1, column 33 to line 1, column 34: Unknown target column > 'ts' > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186) > ~[?:?] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > ~[?:?] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) > ~[?:?] > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > ~[?:?] > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) > ~[flink-sql-client-1.16.1.jar:1.16.1] > ... 13 more > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 33 to line 1, column 34: Unknown target column 'ts' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) ~[?:1.8.0_41] > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > ~[?:1.8.0_41] > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > ~[?:1.8.0_41] > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > ~[?:1.8.0_41] > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) > ~[?:?] > at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) ~[?:?] > at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) ~[?:?] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.newValidationError(PreValidateReWriter.scala:401) > ~[?:?] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.validateField(PreValidateReWriter.scala:389) > ~[?:?] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:172) > ~[?:?] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$$$Lambda$610/614335089.apply(Unknown > Source) ~[?:?] > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at > scala.collection.TraversableLike$$Lambda$329/456314134.apply(Unknown Source) > ~[?:?] > at scala.collection.Iterator.foreach(Iterator.scala:937) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.Iterator.foreach$(Iterator.scala:937) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164) > ~[?:?] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71) > ~[?:?] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61) > ~[?:?] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50) > ~[?:?] > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) ~[?:?] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118) > ~[?:?] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > ~[?:?] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) > ~[?:?] > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > ~[?:?] > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) > ~[flink-sql-client-1.16.1.jar:1.16.1] > ... 13 more > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown > target column 'ts' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) ~[?:1.8.0_41] > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > ~[?:1.8.0_41] > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > ~[?:1.8.0_41] > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > ~[?:1.8.0_41] > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) > ~[?:?] > at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560) > ~[?:?] > at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) ~[?:?] > at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) ~[?:?] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.newValidationError(PreValidateReWriter.scala:401) > ~[?:?] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.validateField(PreValidateReWriter.scala:389) > ~[?:?] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:172) > ~[?:?] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$$$Lambda$610/614335089.apply(Unknown > Source) ~[?:?] > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at > scala.collection.TraversableLike$$Lambda$329/456314134.apply(Unknown Source) > ~[?:?] > at scala.collection.Iterator.foreach(Iterator.scala:937) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.Iterator.foreach$(Iterator.scala:937) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > ~[flink-scala_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164) > ~[?:?] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71) > ~[?:?] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61) > ~[?:?] > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50) > ~[?:?] > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) ~[?:?] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118) > ~[?:?] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > ~[?:?] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) > ~[?:?] > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > ~[?:?] > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) > ~[flink-sql-client-1.16.1.jar:1.16.1] > ... 13 more{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)