Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6218#discussion_r199791671
  
    --- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
    @@ -201,71 +202,69 @@ private Object convert(Schema schema, 
TypeInformation<?> info, Object object) {
                switch (schema.getType()) {
                        case RECORD:
                                if (object instanceof IndexedRecord) {
    -                                   return convertRecord(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
    +                                   return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
                                }
                                throw new IllegalStateException("IndexedRecord 
expected but was: " + object.getClass());
                        case ENUM:
                        case STRING:
                                return object.toString();
                        case ARRAY:
                                if (info instanceof BasicArrayTypeInfo) {
    -                                   final BasicArrayTypeInfo<?, ?> bati = 
(BasicArrayTypeInfo<?, ?>) info;
    -                                   final TypeInformation<?> elementInfo = 
bati.getComponentInfo();
    -                                   return 
convertObjectArray(schema.getElementType(), elementInfo, object);
    +                                   final TypeInformation<?> elementInfo = 
((BasicArrayTypeInfo<?, ?>) info).getComponentInfo();
    +                                   return 
convertToObjectArray(schema.getElementType(), elementInfo, object);
                                } else {
    -                                   final ObjectArrayTypeInfo<?, ?> oati = 
(ObjectArrayTypeInfo<?, ?>) info;
    -                                   final TypeInformation<?> elementInfo = 
oati.getComponentInfo();
    -                                   return 
convertObjectArray(schema.getElementType(), elementInfo, object);
    +                                   final TypeInformation<?> elementInfo = 
((ObjectArrayTypeInfo<?, ?>) info).getComponentInfo();
    +                                   return 
convertToObjectArray(schema.getElementType(), elementInfo, object);
                                }
                        case MAP:
    -                           final MapTypeInfo<?, ?> mti = (MapTypeInfo<?, 
?>) info;
    +                           final MapTypeInfo<?, ?> mapTypeInfo = 
(MapTypeInfo<?, ?>) info;
                                final Map<String, Object> convertedMap = new 
HashMap<>();
                                final Map<?, ?> map = (Map<?, ?>) object;
                                for (Map.Entry<?, ?> entry : map.entrySet()) {
                                        convertedMap.put(
                                                entry.getKey().toString(),
    -                                           convert(schema.getValueType(), 
mti.getValueTypeInfo(), entry.getValue()));
    +                                           
convertAvroType(schema.getValueType(), mapTypeInfo.getValueTypeInfo(), 
entry.getValue()));
                                }
                                return convertedMap;
                        case UNION:
                                final List<Schema> types = schema.getTypes();
                                final int size = types.size();
                                final Schema actualSchema;
                                if (size == 2 && types.get(0).getType() == 
Schema.Type.NULL) {
    -                                   return convert(types.get(1), info, 
object);
    +                                   return convertAvroType(types.get(1), 
info, object);
                                } else if (size == 2 && types.get(1).getType() 
== Schema.Type.NULL) {
    -                                   return convert(types.get(0), info, 
object);
    +                                   return convertAvroType(types.get(0), 
info, object);
                                } else if (size == 1) {
    -                                   return convert(types.get(0), info, 
object);
    +                                   return convertAvroType(types.get(0), 
info, object);
                                } else {
                                        // generic type
                                        return object;
                                }
                        case FIXED:
                                final byte[] fixedBytes = ((GenericFixed) 
object).bytes();
                                if (info == Types.BIG_DEC) {
    -                                   return convertDecimal(schema, 
fixedBytes);
    +                                   return convertToDecimal(schema, 
fixedBytes);
                                }
                                return fixedBytes;
                        case BYTES:
    -                           final ByteBuffer bb = (ByteBuffer) object;
    -                           bb.position(0);
    -                           final byte[] bytes = new byte[bb.remaining()];
    -                           bb.get(bytes);
    +                           final ByteBuffer byteBuffer = (ByteBuffer) 
object;
    +                           byteBuffer.position(0);
    --- End diff --
    
    I will remove it. The tests succeed in both cases.


---

Reply via email to