flink版本是1.11.1

创建了一个udf函数如下


public class FromUnixTime extends ScalarFunction {

  private static final Logger logger =
LoggerFactory.getLogger(FromUnixTime.class);

  public String eval(long unixTime, String timeZone, String format) {
    try {
      DateFormat dateFormat = new SimpleDateFormat(format);
      dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
      return dateFormat.format(new Date(unixTime));
    } catch (Exception e) {
      logger.error("parse unixTime error, unixTime: " + unixTime +
",timeZone: " + timeZone, ",format:" + format, e);
      return null;
    }
  }

  public String eval(long unixTime, String timeZone) {
    return eval(unixTime, timeZone, "yyyy-MM-dd HH:mm:ss");
  }
}



测试执行如下语句是能正常成功的。

<http://apache-flink.147419.n8.nabble.com/file/t572/lALPDiCptXS2iL3NAUbNBPo_1274_326.jpg>
 


但是创建view报如下错误,是啥原因?

create view xx as
select 
  *,
UDF_FromUnixTime(exeAt, 'utc', 'yyyy-MM-ddHH:mm' ) as biz_min
from timeout_topic_source;

其中exeAt是个long类型的时间

错误如下:

org.apache.flink.table.api.ValidationException: SQL validation failed.
Invalid function call:
default_catalog.default_database.UDF_FromUnixTime(BIGINT, CHAR(3) NOT NULL,
CHAR(15) NOT NULL)
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:148)
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
        at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
        at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
        at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCreateView(FlinkSqlInterrpeter.java:343)
        at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:267)
        at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:145)
        at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:114)
        at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
        at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776)
        at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
        at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
        at
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
        at
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Invalid function
call:
default_catalog.default_database.UDF_FromUnixTime(BIGINT, CHAR(3) NOT NULL,
CHAR(15) NOT NULL)
        at
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:207)
        at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:93)
        at
org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:684)
        at
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:448)
        at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:314)
        at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
        at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
        at 
org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
        at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
        at
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
        at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
        at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:143)
        ... 17 more
Caused by: org.apache.flink.table.api.ValidationException: Invalid input
arguments. Expected signatures are:
default_catalog.default_database.UDF_FromUnixTime(unixTime BIGINT NOT NULL,
timeZone STRING)
default_catalog.default_database.UDF_FromUnixTime(unixTime BIGINT NOT NULL,
timeZone STRING, format STRING)
        at
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
        at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
        at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
        ... 43 more
Caused by: org.apache.flink.table.api.ValidationException: Invalid input
arguments.
        at
org.apache.flink.table.types.inference.TypeInferenceUtil.inferInputTypes(TypeInferenceUtil.java:467)
        at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:123)
        at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
        at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
        ... 44 more




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复