Github user tragicjun commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r193039834 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java --- @@ -73,9 +75,37 @@ private AvroRecordClassConverter() { final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted; if (genericTypeInfo.getTypeClass() == Utf8.class) { return BasicTypeInfo.STRING_TYPE_INFO; + } else if (genericTypeInfo.getTypeClass() == Map.class) { + // avro map key is always string + return Types.MAP(Types.STRING, + convertPrimitiveType(schema.getValueType().getType())); + } else if (genericTypeInfo.getTypeClass() == List.class && + schema.getType() == Schema.Type.ARRAY) { + return Types.LIST(convertPrimitiveType(schema.getElementType().getType())); --- End diff -- yes, org.apache.flink.table.api.Types doesn't support LIST, but org.apache.flink.api.common.typeinfo.Types does. The Avro array type would be converted to java List type. Can we add LIST in org.apache.flink.table.api.Types to support Avro arrays?
---