执行如下代码提示异常,改为旧方法StreamTableEnvironment.registerFunction执行正常,
Flink version: 1.11.1

package com.test;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;


public class TestUTDFOk {
    public static class UDTF extends TableFunction<Row> {

        public void eval(String input) {
            Row row = new Row(3);
            row.setField(0, input);
            row.setField(1, input.length());
            row.setField(2, input +  2);
            collect(row);
        }
    }

    public static  class UDF extends ScalarFunction {
        public String eval(Row row, Integer index) {
            try {
                return String.valueOf(row.getField(index));
            } catch (Exception e) {
                throw e;
            }
        }
    }

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(
                env,
EnvironmentSettings.newInstance().useBlinkPlanner().build());
//        tEnv.registerFunction("udtf", new UDTF());
//        tEnv.registerFunction("udf", new UDF());
        tEnv.createTemporarySystemFunction("udtf", new UDTF());
        tEnv.createTemporarySystemFunction("udf", new UDF());

        tEnv.createTemporaryView("source", tEnv.fromValues("a", "b",
"c").as("f0"));
        String sinkDDL = "create table sinkTable ("
                + "f0 String"
                + ", x String"
                + ", y String"
                + ", z String"
                + ") with ("
                + "    'connector.type' = 'filesystem',"
                + "    'format.type' = 'csv',"
                + "    'connector.path' =
'F:\\workspace\\douyu-git\\bd-flink\\core\\logs\\a.csv'"
                + ")";
        String udtfCall = "insert into sinkTable SELECT S.f0"
                + ", udf(f1, 0) as x"
                + ", udf(f1, 1) as y"
                + ", udf(f1, 2) as z"
                + " FROM source as S, LATERAL TABLE(udtf(f0)) as T(f1)";

        tEnv.executeSql(sinkDDL);
        tEnv.executeSql(udtfCall);
    }
}

异常如下:
Exception in thread "main"
org.apache.flink.table.api.ValidationException: SQL validation failed.
An error occurred in the type inference logic of function 'udf'.
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at com.test.TestUTDFOk.main(TestUTDFOk.java:64)
Caused by: org.apache.flink.table.api.ValidationException: An error
occurred in the type inference logic of function 'udf'.
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:165)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:148)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
at java.util.Optional.flatMap(Optional.java:241)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:99)
at 
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:73)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1303)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1318)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1052)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 7 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a valid type inference for function class
'com.test.TestUTDFOk$UDF'. Please check for implementation mistakes
and/or provide a corresponding hint.
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:160)
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:85)
at 
org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:144)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:162)
... 19 more
Caused by: org.apache.flink.table.api.ValidationException: Error in
extracting a signature to output mapping.
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:115)
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:170)
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:158)
... 22 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to
extract a type inference from method:
public java.lang.String
com.test.TestUTDFOk$UDF.eval(org.apache.flink.types.Row,java.lang.Integer)
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:172)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:110)
... 24 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a data type from 'class org.apache.flink.types.Row' in
parameter 0 of method 'eval' in class 'com.test.TestUTDFOk$UDF'.
Please pass the required data type manually or allow RAW types.
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:246)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:224)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:198)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:147)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:396)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:375)
at java.util.Optional.orElseGet(Optional.java:267)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:375)
at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
at 
java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:376)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:354)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:314)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:252)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:164)
... 25 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot
extract a data type from a pure 'org.apache.flink.types.Row' class.
Please use annotations to define field names and field types.
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:305)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.checkForCommonErrors(DataTypeExtractor.java:343)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:277)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:238)
... 45 more

Process finished with exit code 1

-- 
Best,
zz zhang

Reply via email to