twalthr commented on a change in pull request #10503: [FLINK-15137][avro] 
Improve schema derivation for Avro format
URL: https://github.com/apache/flink/pull/10503#discussion_r359214883
 
 

 ##########
 File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
 ##########
 @@ -157,4 +209,162 @@ private AvroSchemaConverter() {
                }
                throw new IllegalArgumentException("Unsupported Avro type '" + 
schema.getType() + "'.");
        }
+
+       private static LogicalType convertToLogicalType(Schema schema) {
+               return convertToDataType(schema).getLogicalType();
+       }
+
+       private static DataType convertToDataType(Schema schema) {
+               switch (schema.getType()) {
+                       case RECORD:
+                               final List<Schema.Field> fields = 
schema.getFields();
+                               final DataTypes.Field[] dataTypeFields = new 
DataTypes.Field[fields.size()];
+                               for (int i = 0; i < fields.size(); i++) {
+                                       final Schema.Field field = 
fields.get(i);
+                                       dataTypeFields[i] = DataTypes.FIELD(
+                                               field.name(),
+                                               
convertToDataType(field.schema()));
+                               }
+                               return DataTypes.ROW(dataTypeFields);
+                       case ENUM:
+                       case STRING:
+                               // convert Avro's Utf8/CharSequence to String
+                               return DataTypes.STRING();
+                       case ARRAY:
+                               // result type might either be 
ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings
+                               return 
DataTypes.ARRAY(convertToDataType(schema.getElementType()));
+                       case MAP:
+                               return DataTypes.MAP(DataTypes.STRING(), 
convertToDataType(schema.getValueType()));
+                       case UNION:
+                               final Schema actualSchema;
+                               if (schema.getTypes().size() == 2 && 
schema.getTypes().get(0).getType() == Schema.Type.NULL) {
 
 Review comment:
   How do we deal with nullability in Avro? We have a type system that supports 
nullability now. And Avro also has this concept. We should move this part to a 
separate method and call `notNull()`,`nullable()` on the resulting data type.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to