[ https://issues.apache.org/jira/browse/FLINK-16800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhenghua Gao updated FLINK-16800: --------------------------------- Summary: TypeMappingUtils#checkIfCompatible didn't deal with nested types (was: TypeMappingUtils#checkPhysicalLogicalTypeCompatible didn't deal with nested types) > TypeMappingUtils#checkIfCompatible didn't deal with nested types > ---------------------------------------------------------------- > > Key: FLINK-16800 > URL: https://issues.apache.org/jira/browse/FLINK-16800 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.10.0 > Reporter: Zhenghua Gao > Priority: Major > Fix For: 1.11.0 > > > the planner will use TypeMappingUtils#checkPhysicalLogicalTypeCompatible to > validate logical schema and physical schema are compatible when translate > CatalogSinkModifyOperation to Calcite relational expression. The validation > didn't deal with nested types well, which could expose the following > ValidationException: > {code:java} > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type ARRAY<ROW<`logistics_status` DECIMAL(38, 18)>> of table field 'old' > does not match with the physical type ARRAY<ROW<`logistics_status` > LEGACY('DECIMAL', 'DECIMAL')>> of the 'old' field of the TableSource return > type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157) > at org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214) > at > org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192) > at > org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:161) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:51) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlan(StreamExecLimit.scala:51) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) > at com.akulaku.data.main.StreamMain.main(StreamMain.java:58) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)