[ 
https://issues.apache.org/jira/browse/FLINK-22689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yunfeng Zhou updated FLINK-22689:
---------------------------------
    Description: 
I wrote the following program according to the example code provided in 
[Documentation/Table API/Row-based 
operations|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#row-based-operations]

{code:java}
public class TableUDF {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
​
        Table input = tEnv.fromValues(
                DataTypes.of("ROW<c STRING>"),
                Row.of("name")
        );
​
        ScalarFunction func = new MyMapFunction();
        tEnv.registerFunction("func", func);
​
        Table table = input
                .map(call("func", $("c")).as("a", "b")); // exception occurs 
here
​
        table.execute().print();
    }
​
    public static class MyMapFunction extends ScalarFunction {
        public Row eval(String a) {
            return Row.of(a, "pre-" + a);
        }
​
        @Override
        public TypeInformation<?> getResultType(Class<?>[] signature) {
            return Types.ROW(Types.STRING, Types.STRING);
        }
    }
}
{code}

The code above would throw an exception like this:

{code}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Only 
a scalar function can be used in the map operator.
  at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.map(OperationTreeBuilder.java:480)
  at org.apache.flink.table.api.internal.TableImpl.map(TableImpl.java:519)
  at org.apache.flink.ml.common.function.TableUDFBug.main(TableUDF.java:29)
{code}


  The core of the program above is identical to that provided in flink 
documentation, but it cannot function correctly. This might affect users who 
want to use custom function with table API.

 

  was:
I wrote the following program according to the example code provided in 
[Documentation/Table API/Row-based 
operations|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#row-based-operations]
public class TableUDF {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
​
        Table input = tEnv.fromValues(
                DataTypes.of("ROW<c STRING>"),
                Row.of("name")
        );
​
        ScalarFunction func = new MyMapFunction();
        tEnv.registerFunction("func", func);
​
        Table table = input
                .map(call("func", $("c")).as("a", "b")); // exception occurs 
here
​
        table.execute().print();
    }
​
    public static class MyMapFunction extends ScalarFunction {
        public Row eval(String a) {
            return Row.of(a, "pre-" + a);
        }
​
        @Override
        public TypeInformation<?> getResultType(Class<?>[] signature) {
            return Types.ROW(Types.STRING, Types.STRING);
        }
    }
}
The code above would throw an exception like this:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Only 
a scalar function can be used in the map operator.
  at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.map(OperationTreeBuilder.java:480)
  at org.apache.flink.table.api.internal.TableImpl.map(TableImpl.java:519)
  at org.apache.flink.ml.common.function.TableUDFBug.main(TableUDF.java:29)
  The core of the program above is identical to that provided in flink 
documentation, but it cannot function correctly. This might affect users who 
want to use custom function with table API.

 


> Table API Documentation Row-Based Operations Example Fails
> ----------------------------------------------------------
>
>                 Key: FLINK-22689
>                 URL: https://issues.apache.org/jira/browse/FLINK-22689
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Ecosystem
>    Affects Versions: 1.12.1
>            Reporter: Yunfeng Zhou
>            Priority: Major
>
> I wrote the following program according to the example code provided in 
> [Documentation/Table API/Row-based 
> operations|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#row-based-operations]
> {code:java}
> public class TableUDF {
>     public static void main(String[] args) {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> ​
>         Table input = tEnv.fromValues(
>                 DataTypes.of("ROW<c STRING>"),
>                 Row.of("name")
>         );
> ​
>         ScalarFunction func = new MyMapFunction();
>         tEnv.registerFunction("func", func);
> ​
>         Table table = input
>                 .map(call("func", $("c")).as("a", "b")); // exception occurs 
> here
> ​
>         table.execute().print();
>     }
> ​
>     public static class MyMapFunction extends ScalarFunction {
>         public Row eval(String a) {
>             return Row.of(a, "pre-" + a);
>         }
> ​
>         @Override
>         public TypeInformation<?> getResultType(Class<?>[] signature) {
>             return Types.ROW(Types.STRING, Types.STRING);
>         }
>     }
> }
> {code}
> The code above would throw an exception like this:
> {code}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Only a scalar function can be used in the map operator.
>   at 
> org.apache.flink.table.operations.utils.OperationTreeBuilder.map(OperationTreeBuilder.java:480)
>   at org.apache.flink.table.api.internal.TableImpl.map(TableImpl.java:519)
>   at org.apache.flink.ml.common.function.TableUDFBug.main(TableUDF.java:29)
> {code}
>   The core of the program above is identical to that provided in flink 
> documentation, but it cannot function correctly. This might affect users who 
> want to use custom function with table API.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to