[jira] [Commented] (FLINK-19735) TableFunction can not work in Flink View

2020-10-20 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217535#comment-17217535
 ] 

shizhengchao commented on FLINK-19735:
--

Hi [~jark], my flink version is 1.11.1, I found that this problem has been 
resolved in version 1.11.2, see  FLINK-18750

> TableFunction can not work in Flink View
> 
>
> Key: FLINK-19735
> URL: https://issues.apache.org/jira/browse/FLINK-19735
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.1
>Reporter: shizhengchao
>Priority: Major
>
> TableFunction can't be work in Flink Sql. Here is my code:
> {code:sql}
> CREATE TABLE test (
>   myField   STRING,
>   name  STRING
> ) WITH (
>   'connector' = 'kafka-0.11',
>   'topic' = '',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'mygroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'csv'
> );
> CREATE TABLE print (
>   myField   STRING,
>   newWord   STRING,
>   newLength INT
> ) WITH (
>   'connector' = 'print'
> );
> CREATE VIEW test_view AS
> SELECT myField, newWord, newLength FROM test, LATERAL 
> TABLE(SplitFunction(myField));
> INSERT INTO print
> SELECT * FROM test_view;
> {code}
> And the function code as this:
> {code:java}
> @FunctionHint(output = @DataTypeHint("ROW"))
> public class SplitFunction extends TableFunction {
> public void eval(String str) {
> for (String s : str.split(" ")) {
> collect(Row.of(s, s.length()));
> }
> }
> }
> {code}
> run the sql,cause an error:
> {code}
> Unable to find source-code formatter for language: log. Available languages 
> are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
> groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, 
> perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
> yamlException in thread "main" 
> org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 2, column 17 to line 2, column 23: Column 'newWord' not found in any 
> table
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>   at 
> org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, 
> column 17 to line 2, column 23: Column 'newWord' not found in any table
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140)
>   at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.jav

[jira] [Commented] (FLINK-19735) TableFunction can not work in Flink View

2020-10-20 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217527#comment-17217527
 ] 

shizhengchao commented on FLINK-19735:
--

Hi, [~jark] , currently, TableFunction only work effect in "INSERT INTO" 
statement

> TableFunction can not work in Flink View
> 
>
> Key: FLINK-19735
> URL: https://issues.apache.org/jira/browse/FLINK-19735
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.1
>Reporter: shizhengchao
>Priority: Major
>
> TableFunction can't be work in Flink Sql. Here is my code:
> {code:sql}
> CREATE TABLE test (
>   myField   STRING,
>   name  STRING
> ) WITH (
>   'connector' = 'kafka-0.11',
>   'topic' = '',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'mygroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'csv'
> );
> CREATE TABLE print (
>   myField   STRING,
>   newWord   STRING,
>   newLength INT
> ) WITH (
>   'connector' = 'print'
> );
> CREATE VIEW test_view AS
> SELECT myField, newWord, newLength FROM test, LATERAL 
> TABLE(SplitFunction(myField));
> INSERT INTO print
> SELECT * FROM test_view;
> {code}
> And the function code as this:
> {code:java}
> @FunctionHint(output = @DataTypeHint("ROW"))
> public class SplitFunction extends TableFunction {
> public void eval(String str) {
> for (String s : str.split(" ")) {
> collect(Row.of(s, s.length()));
> }
> }
> }
> {code}
> run the sql,cause an error:
> {code}
> Unable to find source-code formatter for language: log. Available languages 
> are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
> groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, 
> perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
> yamlException in thread "main" 
> org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 2, column 17 to line 2, column 23: Column 'newWord' not found in any 
> table
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>   at 
> org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, 
> column 17 to line 2, column 23: Column 'newWord' not found in any table
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140)
>   at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574)
>   at 
> org.apache.calc