hudi-bot opened a new issue, #17127: URL: https://github.com/apache/hudi/issues/17127
org.apache.hudi.table.TestHoodieFileGroupReaderOnFlink#getSchemaEvolutionConfigs should have most/all cases enabled. org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader#ParquetColumnarRowSplitReader reads the parquet footer. One solution to adding this support would be to pull this up a few layers, use the util method pruneDataSchema from: [https://github.com/apache/hudi/pull/13654] and then cast/project from the pruned schema to the requested schema. The test datagen will need to be updated because there is a flink bug HUDI-9603 Here are some fixes I already did: {code:java} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java index a0741ea6705..fb46996317e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java @@ -69,7 +69,7 @@ public class SchemaEvolvingRowDataProjection implements RowProjection { case ROW: return createRowProjection(fromType, toType, renamedColumns, fieldNameStack); default: - if (fromType.equals(toType)) { + if (fromType.equals(toType) || fromType.getTypeRoot().equals(toType.getTypeRoot())) { return TypeConverters.NOOP_CONVERTER; } else { // return TypeConverter directly for non-composite type diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java index cef2df7c037..83b886307ab 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java @@ -53,7 +53,10 @@ import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT; import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER; import static org.apache.flink.table.types.logical.LogicalTypeRoot.MAP; import static org.apache.flink.table.types.logical.LogicalTypeRoot.ROW; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARBINARY; import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR; +import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes; +import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; /** * Tool class used to perform supported casts from a {@link LogicalType} to another @@ -81,6 +84,20 @@ public class TypeConverters { LogicalTypeRoot to = toType.getTypeRoot(); switch (to) { + case VARBINARY: { + if (from == VARCHAR) { + return new TypeConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object val) { + return getUTF8Bytes(val.toString()); + } + }; + } + break; + } + case BIGINT: { if (from == INTEGER) { return new TypeConverter() { @@ -202,6 +219,16 @@ public class TypeConverters { } }; } + if (from == VARBINARY) { + return new TypeConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object val) { + return new BinaryStringData(fromUTF8Bytes((byte[]) val)); + } + }; + } break; } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index bb671a8f5ef..f165161b317 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -1336,6 +1336,9 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA // Bytes public boolean bytesToStringSupport = true; + + // TODO: [HUDI-9607] Flink VARBINARY in array and map + public boolean supportBytesInArrayMap = true; } private enum SchemaEvolutionTypePromotionCase { @@ -1430,13 +1433,13 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA if (toplevel) { if (configs.mapSupport) { List<Schema.Field> mapFields = new ArrayList<>(baseFields.size()); - addFieldsHelper(mapFields, baseFields, fieldPrefix + "Map"); + addFieldsHelper(mapFields, baseFields, fieldPrefix + "Map", !configs.supportBytesInArrayMap); finalFields.add(new Schema.Field(fieldPrefix + "Map", Schema.createMap(Schema.createRecord("customMapRecord", "", namespace, false, mapFields)), "", null)); } - if (configs.arraySupport) { + if (configs.arraySupport && configs.anyArraySupport) { List<Schema.Field> arrayFields = new ArrayList<>(baseFields.size()); - addFieldsHelper(arrayFields, baseFields, fieldPrefix + "Array"); + addFieldsHelper(arrayFields, baseFields, fieldPrefix + "Array", !configs.supportBytesInArrayMap); finalFields.add(new Schema.Field(fieldPrefix + "Array", Schema.createArray(Schema.createRecord("customArrayRecord", "", namespace, false, arrayFields)), "", null)); } } @@ -1444,12 +1447,21 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA } private static void addFieldsHelper(List<Schema.Field> finalFields, List<Schema.Type> baseFields, String fieldPrefix) { + addFieldsHelper(finalFields, baseFields, fieldPrefix, false); + } + + // TODO: [HUDI-9603] remove replaceBytesWithStrings when the issue is fixed + private static void addFieldsHelper(List<Schema.Field> finalFields, List<Schema.Type> baseFields, String fieldPrefix, boolean replaceBytesWithStrings) { for (int i = 0; i < baseFields.size(); i++) { if (baseFields.get(i) == Schema.Type.BOOLEAN) { // boolean fields are added fields finalFields.add(new Schema.Field(fieldPrefix + i, AvroSchemaUtils.createNullableSchema(Schema.Type.BOOLEAN), "", null)); } else { - finalFields.add(new Schema.Field(fieldPrefix + i, Schema.create(baseFields.get(i)), "", null)); + Schema.Type type = baseFields.get(i); + if (replaceBytesWithStrings && type == Schema.Type.BYTES) { + type = Schema.Type.STRING; + } + finalFields.add(new Schema.Field(fieldPrefix + i, Schema.create(type), "", null)); } } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java index dc66a0e6a74..f659e06ad50 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java @@ -177,23 +177,9 @@ public class TestHoodieFileGroupReaderOnFlink extends TestHoodieFileGroupReaderB @Override public HoodieTestDataGenerator.SchemaEvolutionConfigs getSchemaEvolutionConfigs() { HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new HoodieTestDataGenerator.SchemaEvolutionConfigs(); - configs.nestedSupport = false; + configs.supportBytesInArrayMap = false; configs.arraySupport = false; - configs.mapSupport = false; configs.anyArraySupport = false; - configs.addNewFieldSupport = false; - configs.intToLongSupport = false; - configs.intToFloatSupport = false; - configs.intToDoubleSupport = false; - configs.intToStringSupport = false; - configs.longToFloatSupport = false; - configs.longToDoubleSupport = false; - configs.longToStringSupport = false; - configs.floatToDoubleSupport = false; - configs.floatToStringSupport = false; - configs.doubleToStringSupport = false; - configs.stringToBytesSupport = false; - configs.bytesToStringSupport = false; return configs; }{code} ## JIRA info - Link: https://issues.apache.org/jira/browse/HUDI-9670 - Type: Bug - Fix version(s): - 1.1.1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
