[ https://issues.apache.org/jira/browse/FLINK-17224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092992#comment-17092992 ]
Danny Chen commented on FLINK-17224: ------------------------------------ [~dwysakowicz] I have fixed the TIME type conversion between our LogicalType and RelDataType [1], but there are still 2 problems: 1. we only support TIME type with precision 0 for Blink planner runtime 2. we do not support type conversion of different TIME type yet, either from "CAST" expression or output conversion, i.e. from TIME(2) to TIME(1) [1] https://github.com/danny0405/flink/commit/20224ac7203599070105aad34e2b7b74c621f867 > Precision of TIME type does not work correctly > ---------------------------------------------- > > Key: FLINK-17224 > URL: https://issues.apache.org/jira/browse/FLINK-17224 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner > Reporter: Dawid Wysakowicz > Assignee: Danny Chen > Priority: Critical > > The support for precision in TIME type does not work correctly causing many > different often cryptic problems. > Precision is completely ignored in {{FlinkTypeFactory:440-446}}: > {code} > case TIME => > if (relDataType.getPrecision > 3) { > throw new TableException( > s"TIME precision is not supported: ${relDataType.getPrecision}") > } > // blink runner support precision 3, but for consistent with flink > runner, we set to 0. > new TimeType() > {code} > Example problem: > {code} > @Test > public void testTimeScalarFunction() throws Exception { > int nanoOfDay = 10 * 1_000_000; > final List<Row> sourceData = Collections.singletonList( > Row.of(LocalTime.ofNanoOfDay(nanoOfDay)) > ); > final List<Row> sinkData = Arrays.asList( > Row.of(nanoOfDay) > ); > TestCollectionTableFactory.reset(); > TestCollectionTableFactory.initData(sourceData); > tEnv().sqlUpdate("CREATE TABLE SourceTable(s TIME(2)) WITH ('connector' > = 'COLLECTION')"); > tEnv().sqlUpdate("CREATE TABLE SinkTable(s BIGINT) WITH ('connector' = > 'COLLECTION')"); > tEnv().from("SourceTable") > .select(call(new TimeScalarFunction(), $("s"))) > .insertInto("SinkTable"); > tEnv().execute("Test Job"); > assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData)); > } > public static class TimeScalarFunction extends ScalarFunction { > public Long eval(@DataTypeHint("TIME(1)") LocalTime time) { > return time.toNanoOfDay(); > } > } > {code} > fails with: > {code} > org.apache.flink.table.api.ValidationException: Invalid function call: > org$apache$flink$table$planner$runtime$stream$table$FunctionITCase$TimeScalarFunction$a19cd231ba10cbbc0b55ebeda49e2a77(TIME(0)) > at > org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:198) > at > org.apache.flink.table.planner.functions.inference.TypeInferenceReturnInference.inferReturnType(TypeInferenceReturnInference.java:73) > at > org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:486) > at > org.apache.calcite.rex.RexBuilder.deriveReturnType(RexBuilder.java:277) > at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:576) > at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:583) > at > org.apache.flink.table.planner.expressions.converter.FunctionDefinitionConvertRule.convert(FunctionDefinitionConvertRule.java:67) > at > org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97) > at > org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72) > at > org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122) > at > org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:681) > at > org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:128) > at > org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$2(QueryOperationConverter.java:487) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.convertToRexNodes(QueryOperationConverter.java:488) > at > org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:152) > at > org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:148) > at > org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75) > at > org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145) > at > org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:127) > at > org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:46) > at > org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75) > at > org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:176) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:188) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:761) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:753) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:720) > at > org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.testTimeScalarFunction(FunctionITCase.java:151) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at > org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > Caused by: org.apache.flink.table.api.ValidationException: Could not infer an > output type for the given arguments. > at > org.apache.flink.table.types.inference.TypeInferenceUtil.inferOutputType(TypeInferenceUtil.java:146) > at > org.apache.flink.table.planner.functions.inference.TypeInferenceReturnInference.inferReturnTypeOrError(TypeInferenceReturnInference.java:82) > at > org.apache.flink.table.planner.functions.inference.TypeInferenceReturnInference.inferReturnType(TypeInferenceReturnInference.java:70) > ... 74 more > {code} > The problem is that after an input type inference we apply a cast {{cast("a" > AS TIME(1))}} , but when to and from {{RelDataType}} this expression is > converted to {{cast("a" AS TIME(0))}} and we end up again with wrong type > which results in failling {{MappingTypeStrategy}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)