[ https://issues.apache.org/jira/browse/FLINK-16068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037445#comment-17037445 ]
Jark Wu commented on FLINK-16068: --------------------------------- cc [~danny0405], could you help to take a look at this? It seems there is a bug in the SQL parser when the column name is a keyword and a computed column in the definition. > table with keyword-escaped columns and computed_column_expression columns > ------------------------------------------------------------------------- > > Key: FLINK-16068 > URL: https://issues.apache.org/jira/browse/FLINK-16068 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client > Affects Versions: 1.10.0 > Reporter: pangliang > Priority: Major > > I use sql-client to create a table with keyword-escaped column and > computed_column_expression column, like this: > {code:java} > CREATE TABLE source_kafka ( > log STRING, > `time` BIGINT, > pt as proctime() > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'k8s-logs', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.zookeeper.connect' = > 'zk-1.zk:2181,zk-2.zk:2181,zk-3.zk:2181/kafka', > 'connector.properties.bootstrap.servers' = 'kafka.default:9092', > 'connector.properties.group.id' = 'testGroup', > 'format.type'='json', > 'format.fail-on-missing-field' = 'true', > 'update-mode' = 'append' > ); > {code} > Then I simply used it : > {code:java} > SELECT * from source_kafka limit 10;{code} > got an exception: > {code:java} > java.io.IOException: Fail to run stream sql job > at > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:164) > at > org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callSelect(FlinkStreamSqlInterpreter.java:108) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:203) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:104) > at > org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:103) > at > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:676) > at > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:569) > at org.apache.zeppelin.scheduler.Job.run(Job.java:172) > at > org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:121) > at > org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. > Encountered "time" at line 1, column 12. > Was expecting one of: > "ABS" ... > "ARRAY" ... > "AVG" ... > "CARDINALITY" ... > "CASE" ... > "CAST" ... > "CEIL" ... > "CEILING" ... > ...... > > at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) > at > org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) > at > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:104) > ... 13 more > {code} > I also did some tests, the following can run: > {code:java} > CREATE TABLE source_kafka ( > log STRING, > `aaaaa` BIGINT, > pt as proctime() > ) > CREATE TABLE source_kafka ( > log STRING, > `time` BIGINT > ) > CREATE TABLE source_kafka ( > log STRING, > pt as proctime() > ){code} > can not run: > `time` , `select`, `string` > -- This message was sent by Atlassian Jira (v8.3.4#803005)