[ 
https://issues.apache.org/jira/browse/FLINK-30922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686155#comment-17686155
 ] 

Shuiqiang Chen commented on FLINK-30922:
----------------------------------------

I have created a pull request to fixed the issue, Anyone who help review the PR 
will be highly appreciated.

> SQL validate fail in parsing writable metadata
> ----------------------------------------------
>
>                 Key: FLINK-30922
>                 URL: https://issues.apache.org/jira/browse/FLINK-30922
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.16.1
>            Reporter: tanjialiang
>            Priority: Major
>              Labels: pull-request-available
>
> When i tried an simple demo sql with writing metadata to the kafka in flink 
> sql client
> {code:java}
> CREATE TABLE KafkaTable (
>   `user_id` BIGINT,
>   `item_id` BIGINT,
>   `behavior` STRING,
>   `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behavior',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> )
> INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code}
>  
> it will be throw an error
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse 
> statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP;
>         at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown 
> Source) ~[?:?]
>         at 
> org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
> [flink-sql-client-1.16.1.jar:1.16.1]
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
> failed. From line 1, column 33 to line 1, column 34: Unknown target column 
> 'ts'
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>  ~[?:?]
>         at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         ... 13 more
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 33 to line 1, column 34: Unknown target column 'ts'
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method) ~[?:1.8.0_41]
>         at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  ~[?:1.8.0_41]
>         at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  ~[?:1.8.0_41]
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:422) 
> ~[?:1.8.0_41]
>         at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) 
> ~[?:?]
>         at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) ~[?:?]
>         at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.newValidationError(PreValidateReWriter.scala:401)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.validateField(PreValidateReWriter.scala:389)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:172)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$$$Lambda$610/614335089.apply(Unknown
>  Source) ~[?:?]
>         at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at 
> scala.collection.TraversableLike$$Lambda$329/456314134.apply(Unknown Source) 
> ~[?:?]
>         at scala.collection.Iterator.foreach(Iterator.scala:937) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.Iterator.foreach$(Iterator.scala:937) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.IterableLike.foreach(IterableLike.scala:70) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:69) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.TraversableLike.map(TraversableLike.scala:233) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.TraversableLike.map$(TraversableLike.scala:226) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50)
>  ~[?:?]
>         at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>  ~[?:?]
>         at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         ... 13 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown 
> target column 'ts'
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method) ~[?:1.8.0_41]
>         at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  ~[?:1.8.0_41]
>         at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  ~[?:1.8.0_41]
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:422) 
> ~[?:1.8.0_41]
>         at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) 
> ~[?:?]
>         at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560) 
> ~[?:?]
>         at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) ~[?:?]
>         at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.newValidationError(PreValidateReWriter.scala:401)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.validateField(PreValidateReWriter.scala:389)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:172)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$$$Lambda$610/614335089.apply(Unknown
>  Source) ~[?:?]
>         at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at 
> scala.collection.TraversableLike$$Lambda$329/456314134.apply(Unknown Source) 
> ~[?:?]
>         at scala.collection.Iterator.foreach(Iterator.scala:937) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.Iterator.foreach$(Iterator.scala:937) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.IterableLike.foreach(IterableLike.scala:70) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:69) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.TraversableLike.map(TraversableLike.scala:233) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.TraversableLike.map$(TraversableLike.scala:226) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[flink-scala_2.12-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:164)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:71)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:61)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:50)
>  ~[?:?]
>         at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:118)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>  ~[?:?]
>         at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         ... 13 more{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to