hudi-bot opened a new issue, #17127:
URL: https://github.com/apache/hudi/issues/17127

   
org.apache.hudi.table.TestHoodieFileGroupReaderOnFlink#getSchemaEvolutionConfigs
   should have most/all cases enabled. 
   
    
   
   
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader#ParquetColumnarRowSplitReader
 reads the parquet footer. One solution to adding this support would be to pull 
this up a few layers, use the util method pruneDataSchema from: 
[https://github.com/apache/hudi/pull/13654]  and then cast/project from the 
pruned schema to the requested schema.
   
    
   
    The test datagen will need to be updated because there is a flink bug 
HUDI-9603 
   
   Here are some fixes I already did:
   {code:java}
   diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
   index a0741ea6705..fb46996317e 100644
   --- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
   +++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
   @@ -69,7 +69,7 @@ public class SchemaEvolvingRowDataProjection implements 
RowProjection {
          case ROW:
            return createRowProjection(fromType, toType, renamedColumns, 
fieldNameStack);
          default:
   -        if (fromType.equals(toType)) {
   +        if (fromType.equals(toType) || 
fromType.getTypeRoot().equals(toType.getTypeRoot())) {
              return TypeConverters.NOOP_CONVERTER;
            } else {
              // return TypeConverter directly for non-composite type
   diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
   index cef2df7c037..83b886307ab 100644
   --- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
   +++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
   @@ -53,7 +53,10 @@ import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT;
    import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
    import static org.apache.flink.table.types.logical.LogicalTypeRoot.MAP;
    import static org.apache.flink.table.types.logical.LogicalTypeRoot.ROW;
   +import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.VARBINARY;
    import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
   +import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
   +import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
    
    /**
     * Tool class used to perform supported casts from a {@link LogicalType} to 
another
   @@ -81,6 +84,20 @@ public class TypeConverters {
        LogicalTypeRoot to = toType.getTypeRoot();
    
        switch (to) {
   +      case VARBINARY: {
   +        if (from == VARCHAR) {
   +          return new TypeConverter() {
   +            private static final long serialVersionUID = 1L;
   +
   +            @Override
   +            public Object convert(Object val) {
   +              return getUTF8Bytes(val.toString());
   +            }
   +          };
   +        }
   +        break;
   +      }
   +
          case BIGINT: {
            if (from == INTEGER) {
              return new TypeConverter() {
   @@ -202,6 +219,16 @@ public class TypeConverters {
                }
              };
            }
   +        if (from == VARBINARY) {
   +          return new TypeConverter() {
   +            private static final long serialVersionUID = 1L;
   +
   +            @Override
   +            public Object convert(Object val) {
   +              return new BinaryStringData(fromUTF8Bytes((byte[]) val));
   +            }
   +          };
   +        }
            break;
          }
    
   diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
   index bb671a8f5ef..f165161b317 100644
   --- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
   +++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
   @@ -1336,6 +1336,9 @@ Generate random record using 
TRIP_ENCODED_DECIMAL_SCHEMA
    
        // Bytes
        public boolean bytesToStringSupport = true;
   +
   +    // TODO: [HUDI-9607] Flink VARBINARY in array and map
   +    public boolean supportBytesInArrayMap = true;
      }
    
      private enum SchemaEvolutionTypePromotionCase {
   @@ -1430,13 +1433,13 @@ Generate random record using 
TRIP_ENCODED_DECIMAL_SCHEMA
        if (toplevel) {
          if (configs.mapSupport) {
            List<Schema.Field> mapFields = new ArrayList<>(baseFields.size());
   -        addFieldsHelper(mapFields, baseFields, fieldPrefix + "Map");
   +        addFieldsHelper(mapFields, baseFields, fieldPrefix + "Map", 
!configs.supportBytesInArrayMap);
            finalFields.add(new Schema.Field(fieldPrefix + "Map", 
Schema.createMap(Schema.createRecord("customMapRecord", "", namespace, false, 
mapFields)), "", null));
          }
    
   -      if (configs.arraySupport) {
   +      if (configs.arraySupport && configs.anyArraySupport) {
            List<Schema.Field> arrayFields = new ArrayList<>(baseFields.size());
   -        addFieldsHelper(arrayFields, baseFields, fieldPrefix + "Array");
   +        addFieldsHelper(arrayFields, baseFields, fieldPrefix + "Array", 
!configs.supportBytesInArrayMap);
            finalFields.add(new Schema.Field(fieldPrefix + "Array", 
Schema.createArray(Schema.createRecord("customArrayRecord", "", namespace, 
false, arrayFields)), "", null));
          }
        }
   @@ -1444,12 +1447,21 @@ Generate random record using 
TRIP_ENCODED_DECIMAL_SCHEMA
      }
    
      private static void addFieldsHelper(List<Schema.Field> finalFields, 
List<Schema.Type> baseFields, String fieldPrefix) {
   +    addFieldsHelper(finalFields, baseFields, fieldPrefix, false);
   +  }
   +
   +  // TODO: [HUDI-9603] remove replaceBytesWithStrings when the issue is 
fixed
   +  private static void addFieldsHelper(List<Schema.Field> finalFields, 
List<Schema.Type> baseFields, String fieldPrefix, boolean 
replaceBytesWithStrings) {
        for (int i = 0; i < baseFields.size(); i++) {
          if (baseFields.get(i) == Schema.Type.BOOLEAN) {
            // boolean fields are added fields
            finalFields.add(new Schema.Field(fieldPrefix + i, 
AvroSchemaUtils.createNullableSchema(Schema.Type.BOOLEAN), "", null));
          } else {
   -        finalFields.add(new Schema.Field(fieldPrefix + i, 
Schema.create(baseFields.get(i)), "", null));
   +        Schema.Type type = baseFields.get(i);
   +        if (replaceBytesWithStrings && type == Schema.Type.BYTES) {
   +          type = Schema.Type.STRING;
   +        }
   +        finalFields.add(new Schema.Field(fieldPrefix + i, 
Schema.create(type), "", null));
          }
        }
      }
   diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
   index dc66a0e6a74..f659e06ad50 100644
   --- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
   +++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
   @@ -177,23 +177,9 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
      @Override
      public HoodieTestDataGenerator.SchemaEvolutionConfigs 
getSchemaEvolutionConfigs() {
        HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new 
HoodieTestDataGenerator.SchemaEvolutionConfigs();
   -    configs.nestedSupport = false;
   +    configs.supportBytesInArrayMap = false;
        configs.arraySupport = false;
   -    configs.mapSupport = false;
        configs.anyArraySupport = false;
   -    configs.addNewFieldSupport = false;
   -    configs.intToLongSupport = false;
   -    configs.intToFloatSupport = false;
   -    configs.intToDoubleSupport = false;
   -    configs.intToStringSupport = false;
   -    configs.longToFloatSupport = false;
   -    configs.longToDoubleSupport = false;
   -    configs.longToStringSupport = false;
   -    configs.floatToDoubleSupport = false;
   -    configs.floatToStringSupport = false;
   -    configs.doubleToStringSupport = false;
   -    configs.stringToBytesSupport = false;
   -    configs.bytesToStringSupport = false;
        return configs;
      }{code}
   
   ## JIRA info
   
   - Link: https://issues.apache.org/jira/browse/HUDI-9670
   - Type: Bug
   - Fix version(s):
     - 1.1.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to