dawidwys commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r511842204



##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -417,4 +431,11 @@ public static LogicalType 
extractValueTypeToAvroMap(LogicalType type) {
                }
                return builder;
        }
+
+       /** Returns schema with nullable true. */
+       private static Schema nullableSchema(Schema schema) {

Review comment:
       Could we stick to a single way of declaring `Schema` nullable? With this 
PR we have two methods for the same purpose:
   * `nullableSchema`
   * `getNullableBuilder`
   
   Either use the `nullableSchema` everywhere or use 
`getNullableBuilder(...).type(...)`.

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
##########
@@ -104,48 +104,115 @@ public void testRowTypeAvroSchemaConversion() {
                                DataTypes.FIELD("row3", 
DataTypes.ROW(DataTypes.FIELD("c", DataTypes.STRING())))))
                        .build().toRowDataType().getLogicalType();
                Schema schema = AvroSchemaConverter.convertToSchema(rowType);
-               assertEquals("{\n" +
+               assertEquals("[ {\n" +

Review comment:
       I think it would be nice to add a test that we can convert back and 
forth between `DataType` and `Schema` in respect to the field names. 

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -362,7 +369,11 @@ public static Schema convertToSchema(LogicalType 
logicalType, String rowName) {
                                        .record(rowName)
                                        .fields();
                                for (int i = 0; i < rowType.getFieldCount(); 
i++) {
-                                       String fieldName = rowName + "_" + 
fieldNames.get(i);
+                                       String fieldName = fieldNames.get(i);
+                                       if (rowName.equals(fieldName)) {
+                                               // Can not build schema when 
the record and field have the same name
+                                               fieldName = rowName + "_" + 
fieldName;
+                                       }

Review comment:
       I think the correct solution will be:
   
   ```
                                RowType rowType = (RowType) logicalType;
                                List<String> fieldNames = 
rowType.getFieldNames();
                                // we have to make sure the record name is 
different in a Schema
                                SchemaBuilder.FieldAssembler<Schema> builder =
                                                getNullableBuilder(logicalType)
                                                                .record(rowName)
                                                                .fields();
                                for (int i = 0; i < rowType.getFieldCount(); 
i++) {
                                        String fieldName = fieldNames.get(i);
                                        builder = builder
                                                .name(fieldName)
                                                
.type(convertToSchema(rowType.getTypeAt(i), rowName + "_" + fieldName))
                                                .noDefault();
                                }
                                return builder.endRecord();
   ```

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -362,7 +369,11 @@ public static Schema convertToSchema(LogicalType 
logicalType, String rowName) {
                                        .record(rowName)
                                        .fields();
                                for (int i = 0; i < rowType.getFieldCount(); 
i++) {
-                                       String fieldName = rowName + "_" + 
fieldNames.get(i);
+                                       String fieldName = fieldNames.get(i);
+                                       if (rowName.equals(fieldName)) {
+                                               // Can not build schema when 
the record and field have the same name
+                                               fieldName = rowName + "_" + 
fieldName;
+                                       }

Review comment:
       That is not correct. The builder does support same names for fields in 
different nested levels.
   
   Avro in general does not support same record types with different schemas. 
And it does it rightly so. Therefore a schema like:
   
   ```
   {
       "type": "record", 
       "name": "top", 
       "fields": [ 
                  {
                "name": "top", 
                "type": { 
                    "type": "record", 
                    "name": "nested", 
                    "fields": [ 
                        {"type": "string", "name": "top"} 
                    ]
                }
             }
       ] 
   }
   ```
   is valid and supported. However if we change the name of the `nested` record 
to `top` it will be invalid:
   ```
   {
       "type": "record", 
       "name": "top", 
       "fields": [ 
                  {
                "name": "top", 
                "type": { 
                    "type": "record", 
                    "name": "top", 
                    "fields": [ 
                        {"type": "string", "name": "top"} 
                    ]
                }
             }
       ] 
   }
   ```
   
   I think the core problem lays in how the `rowName` is generated. I think we 
should never adjust the `fieldName`, but we should append the `fieldName` to 
the `rowName`.
   
   BTW another shortcoming that I see is that we are losing the record name 
when converting from `Schema` to `DataType`. I think it is not a real issue 
though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to