[
https://issues.apache.org/jira/browse/FLINK-18339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu closed FLINK-18339.
---------------------------
Resolution: Duplicate
Looks like this is a duplicate issue of FLINK-16622
> ValidationException exception that field typeinformation in TableSchema and
> in TableSource return type for blink
> -----------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-18339
> URL: https://issues.apache.org/jira/browse/FLINK-18339
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.9.0
> Reporter: hehuiyuan
> Priority: Minor
> Attachments: image-2020-06-17-10-37-48-166.png,
> image-2020-06-17-10-53-08-424.png, image-2020-06-17-15-24-16-290.png,
> image-2020-06-17-15-24-24-093.png
>
>
> The type of `datatime` field is OBJECT_ARRAY<STRING>.
>
> Exception:
>
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type LEGACY(BasicArrayTypeInfo<String>) of table field 'datatime' does not
> match with type BasicArrayTypeInfo<String> of the field 'datatime' of the
> TableSource return type.Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type
> LEGACY(BasicArrayTypeInfo<String>) of table field 'datatime' does not match
> with type BasicArrayTypeInfo<String> of the field 'datatime' of the
> TableSource return type. at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
> at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:141)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:119)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:152)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
> {code}
>
> Usage:
> `fieldNames` is field name array
> `fieldsType` is field type array
>
> We can acquire field typeinformation by the way:
>
> {code:java}
> TypeInformation typeInformation =
> TypeStringUtils.readTypeInfo("OBJECT_ARRAY<STRING>");
> {code}
>
>
> {code:java}
> ConnectTableDescriptor d =
> descriptor.withFormat(
> new Csv().fieldDelimiter(fielddelimiter).schema(new
> RowTypeInfo(fieldsType,fieldNames))
> )
> .withSchema(
> schema
> );
> {code}
>
> (1) RowTypeInfo(fieldsType,fieldNames) calls toString method:
> Row(name: String, age: Integer, sex: String, datatime:
> BasicArrayTypeInfo<String>)
> `datatime` field type is BasicArrayTypeInfo<String>.
>
> (2)Schema shema :
> schema = schema.field(fieldNames[i],fieldsType[i]);
> `datatime` field type is BasicArrayTypeInfo<String>
> !image-2020-06-17-10-37-48-166.png!
>
> Code analysis:
>
> `schemaBuilder.field(name, type)` is called when create TableSchema
> {code:java}
> public Builder field(String name, TypeInformation<?> typeInfo) {
> return field(name, fromLegacyInfoToDataType(typeInfo));
> }
> public static DataType fromLegacyInfoToDataType(TypeInformation<?> typeInfo) {
> return LegacyTypeInfoDataTypeConverter.toDataType(typeInfo);
> }
> public static DataType toDataType(TypeInformation<?> typeInfo) {
> // time indicators first as their hashCode/equals is shared with those of
> regular timestamps
> if (typeInfo instanceof TimeIndicatorTypeInfo) {
> return convertToTimeAttributeType((TimeIndicatorTypeInfo) typeInfo);
> }
> final DataType foundDataType = typeInfoDataTypeMap.get(typeInfo);
> if (foundDataType != null) {
> return foundDataType;
> }
> if (typeInfo instanceof RowTypeInfo) {
> return convertToRowType((RowTypeInfo) typeInfo);
> }
> else if (typeInfo instanceof ObjectArrayTypeInfo) {
> return convertToArrayType(
> typeInfo.getTypeClass(),
> ((ObjectArrayTypeInfo) typeInfo).getComponentInfo());
> }
> else if (typeInfo instanceof BasicArrayTypeInfo) {
> return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo);
> }
> else if (typeInfo instanceof MultisetTypeInfo) {
> return convertToMultisetType(((MultisetTypeInfo)
> typeInfo).getElementTypeInfo());
> }
> else if (typeInfo instanceof MapTypeInfo) {
> return convertToMapType((MapTypeInfo) typeInfo);
> }
> else if (typeInfo instanceof CompositeType) {
> return createLegacyType(LogicalTypeRoot.STRUCTURED_TYPE, typeInfo);
> }
> return createLegacyType(LogicalTypeRoot.ANY, typeInfo);
> }
> {code}
> if typeinformation is BasicArrayTypeinfo , the code is called:
> {code:java}
> else if (typeInfo instanceof BasicArrayTypeInfo) {
> return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo);
> }
> private static DataType createLegacyType(LogicalTypeRoot typeRoot,
> TypeInformation<?> typeInfo) {
> return new AtomicDataType(new LegacyTypeInformationType<>(typeRoot,
> typeInfo))
> .bridgedTo(typeInfo.getTypeClass());
> }
> {code}
> `datatime` field type is LEGACY(BasicArrayTypeInfo<String>)
>
>
>
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)