[ 
https://issues.apache.org/jira/browse/FLINK-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014333#comment-17014333
 ] 

Benoit Hanotte commented on FLINK-15574:
----------------------------------------

I created the following PR to remove the deprcated logic in 
`LogicalTypeDataTypeConverter`: https://github.com/apache/flink/pull/10843
It is not clear to me why this logic, while deprecated, was kept.

> DataType to LogicalType conversion issue
> ----------------------------------------
>
>                 Key: FLINK-15574
>                 URL: https://issues.apache.org/jira/browse/FLINK-15574
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>            Reporter: Benoit Hanotte
>            Priority: Major
>         Attachments: 0001-FLINK-15574-Add-unit-test-to-reproduce-issue.patch
>
>
> We seem to be encountering an issue with the conversion from DataType to 
> LogicalType with the Blink planner (full stacktrace below):
> {code}
> org.apache.flink.table.api.ValidationException: Type 
> LEGACY(BasicArrayTypeInfo<String>) of table field 'my_array' does not match 
> with type BasicArrayTypeInfo<String> of the field 'my_array' of the 
> TableSource return type.
> {code}
> It seems there exists 2 paths to do the conversion from DataType to 
> LogicalType:
> 1. TypeConversions.fromLegacyInfoToDataType():
> used for instance when calling TableSchema.fromTypeInformation().
> 2.  LogicalTypeDataTypeConverter.fromDataTypeToLogicalType():
> Deprecated but still used in TableSourceUtil and many other places.
> These 2 code paths can return a different LogicalType for the same input, 
> leading to issues when the LogicalTypes are compared to ensure they are 
> compatible.  For instance, PlannerTypeUtils.isAssignable() returns false for 
> a DataType created from BasicArrayTypeInfo (leading to the 
> ValidationException above).
> The full stacktrace is the following:
> {code}
> org.apache.flink.table.api.ValidationException: Type 
> LEGACY(BasicArrayTypeInfo<String>) of table field 'my_array' does not match 
> with type BasicArrayTypeInfo<String> of the field 'my_array' of the 
> TableSource return type.
>       at 
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>       at 
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>       at 
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion$$anonfun$2.apply(StreamExecUnion.scala:86)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion$$anonfun$2.apply(StreamExecUnion.scala:86)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion.translateToPlanInternal(StreamExecUnion.scala:85)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion.translateToPlanInternal(StreamExecUnion.scala:39)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion.translateToPlan(StreamExecUnion.scala:39)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:140)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:140)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecWindowJoin.translateToPlanInternal(StreamExecWindowJoin.scala:135)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecWindowJoin.translateToPlanInternal(StreamExecWindowJoin.scala:52)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecWindowJoin.translateToPlan(StreamExecWindowJoin.scala:52)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:140)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
>       at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
>       at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:104)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to