????????DataType??????????????????????????????????udf??????????

import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;




public class UTC2Local extends ScalarFunction {
    public Timestamp eval(Timestamp s) {
        long timestamp = s.getTime() + 28800000;
        return new Timestamp(timestamp);
    }


}








------------------ ???????? ------------------
??????:&nbsp;"JingsongLee"<lzljs3620...@aliyun.com.INVALID&gt;;
????????:&nbsp;2019??9??5??(??????) ????11:55
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;  Re: flink1.9??blinkSQL??????udf??TIMESTAMP????????



????????DataType??????????????????
??????????????????<=3????????????DataTypes.TIMESTAMP(3)????????

Best,
Jingsong Lee


------------------------------------------------------------------
From:???? <346531...@qq.com&gt;
Send Time:2019??9??5??(??????) 11:48
To:user-zh <user-zh@flink.apache.org&gt;
Subject:flink1.9??blinkSQL??????udf??TIMESTAMP????????

??????????????????


??????????flink1.9????????flinkSQL????udf????????????????????????blinkSQL????????udf????????TIMESTAMP??????????udf??????????????????????????????+8??????????????????


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Not support dataType: TIMESTAMP(9)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
java.security.AccessController.doPrivileged(Native Method)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
javax.security.auth.Subject.doAs(Subject.java:422)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)

回复