[jira] [Commented] (FLINK-19735) TableFunction can not work in Flink View
[ https://issues.apache.org/jira/browse/FLINK-19735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217535#comment-17217535 ] shizhengchao commented on FLINK-19735: -- Hi [~jark], my flink version is 1.11.1, I found that this problem has been resolved in version 1.11.2, see FLINK-18750 > TableFunction can not work in Flink View > > > Key: FLINK-19735 > URL: https://issues.apache.org/jira/browse/FLINK-19735 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.1 >Reporter: shizhengchao >Priority: Major > > TableFunction can't be work in Flink Sql. Here is my code: > {code:sql} > CREATE TABLE test ( > myField STRING, > name STRING > ) WITH ( > 'connector' = 'kafka-0.11', > 'topic' = '', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'mygroup', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'csv' > ); > CREATE TABLE print ( > myField STRING, > newWord STRING, > newLength INT > ) WITH ( > 'connector' = 'print' > ); > CREATE VIEW test_view AS > SELECT myField, newWord, newLength FROM test, LATERAL > TABLE(SplitFunction(myField)); > INSERT INTO print > SELECT * FROM test_view; > {code} > And the function code as this: > {code:java} > @FunctionHint(output = @DataTypeHint("ROW")) > public class SplitFunction extends TableFunction { > public void eval(String str) { > for (String s : str.split(" ")) { > collect(Row.of(s, s.length())); > } > } > } > {code} > run the sql,cause an error: > {code} > Unable to find source-code formatter for language: log. Available languages > are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, > groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, > perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, > yamlException in thread "main" > org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 2, column 17 to line 2, column 23: Column 'newWord' not found in any > table > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52) > at > com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208) > at > com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200) > at > com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129) > at > com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, > column 17 to line 2, column 23: Column 'newWord' not found in any table > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.jav
[jira] [Commented] (FLINK-19735) TableFunction can not work in Flink View
[ https://issues.apache.org/jira/browse/FLINK-19735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217527#comment-17217527 ] shizhengchao commented on FLINK-19735: -- Hi, [~jark] , currently, TableFunction only work effect in "INSERT INTO" statement > TableFunction can not work in Flink View > > > Key: FLINK-19735 > URL: https://issues.apache.org/jira/browse/FLINK-19735 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.1 >Reporter: shizhengchao >Priority: Major > > TableFunction can't be work in Flink Sql. Here is my code: > {code:sql} > CREATE TABLE test ( > myField STRING, > name STRING > ) WITH ( > 'connector' = 'kafka-0.11', > 'topic' = '', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'mygroup', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'csv' > ); > CREATE TABLE print ( > myField STRING, > newWord STRING, > newLength INT > ) WITH ( > 'connector' = 'print' > ); > CREATE VIEW test_view AS > SELECT myField, newWord, newLength FROM test, LATERAL > TABLE(SplitFunction(myField)); > INSERT INTO print > SELECT * FROM test_view; > {code} > And the function code as this: > {code:java} > @FunctionHint(output = @DataTypeHint("ROW")) > public class SplitFunction extends TableFunction { > public void eval(String str) { > for (String s : str.split(" ")) { > collect(Row.of(s, s.length())); > } > } > } > {code} > run the sql,cause an error: > {code} > Unable to find source-code formatter for language: log. Available languages > are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, > groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, > perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, > yamlException in thread "main" > org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 2, column 17 to line 2, column 23: Column 'newWord' not found in any > table > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52) > at > com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208) > at > com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200) > at > com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129) > at > com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, > column 17 to line 2, column 23: Column 'newWord' not found in any table > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574) > at > org.apache.calc