danny0405 commented on code in PR #9133: URL: https://github.com/apache/hudi/pull/9133#discussion_r1255291076
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java: ########## @@ -165,21 +192,132 @@ void add(int pos, LogicalType fromType, LogicalType toType) { } break; } + case ARRAY: { + if (from == ARRAY) { + LogicalType fromElementType = fromType.getChildren().get(0); + LogicalType toElementType = toType.getChildren().get(0); + return array -> doArrayConversion((ArrayData) array, fromElementType, toElementType); + } + break; + } + case MAP: { + if (from == MAP) { + return map -> doMapConversion((MapData) map, fromType, toType); + } + break; + } + case ROW: { + if (from == ROW) { + // Assumption: InternalSchemaManager should produce a cast that is of the same size + return row -> doRowConversion((RowData) row, fromType, toType); + } + break; + } default: } - return null; + throw new IllegalArgumentException(String.format("Unsupported conversion for %s => %s", fromType, toType)); } - private void add(int pos, Cast cast) { - castMap.put(pos, cast); + /** + * Helper function to perform convert an arrayData from one LogicalType to another. + * + * @param array Non-null array data to be converted; however array-elements are allowed to be null + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted array that has the structure/specifications of that defined by the output LogicalType + */ + private static ArrayData doArrayConversion(@Nonnull ArrayData array, LogicalType fromType, LogicalType toType) { + // using Object type here as primitives are not allowed to be null + Object[] objects = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + Object fromObject = ArrayData.createElementGetter(fromType).getElementOrNull(array, i); + // need to handle nulls to prevent NullPointerException in #getConversion() + Object toObject = fromObject != null ? getConversion(fromType, toType).apply(fromObject) : null; + objects[i] = toObject; + } + return new GenericArrayData(objects); + } + + /** + * Helper function to perform convert a MapData from one LogicalType to another. + * + * @param map Non-null map data to be converted; however, values are allowed to be null + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted map that has the structure/specifications of that defined by the output LogicalType + */ + private static MapData doMapConversion(@Nonnull MapData map, LogicalType fromType, LogicalType toType) { + // no schema evolution is allowed on the keyType, hence, we only need to care about the valueType + LogicalType fromValueType = fromType.getChildren().get(1); + LogicalType toValueType = toType.getChildren().get(1); + LogicalType keyType = fromType.getChildren().get(0); + + final Map<Object, Object> result = new HashMap<>(); + for (int i = 0; i < map.size(); i++) { + Object keyObject = ArrayData.createElementGetter(keyType).getElementOrNull(map.keyArray(), i); + Object fromObject = ArrayData.createElementGetter(fromValueType).getElementOrNull(map.valueArray(), i); + // need to handle nulls to prevent NullPointerException in #getConversion() + Object toObject = fromObject != null ? getConversion(fromValueType, toValueType).apply(fromObject) : null; + result.put(keyObject, toObject); + } + return new GenericMapData(result); + } + + /** + * Helper function to perform convert a RowData from one LogicalType to another. + * + * @param row Non-null row data to be converted; however, fields might contain nulls + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted row that has the structure/specifications of that defined by the output LogicalType + */ + private static RowData doRowConversion(@Nonnull RowData row, LogicalType fromType, LogicalType toType) { + // note: InternalSchema.merge guarantees that the schema to be read fromType is orientated in the same order as toType + // hence, we can match types by position as it is guaranteed that it is referencing the same field + List<LogicalType> fromChildren = fromType.getChildren(); + List<LogicalType> toChildren = toType.getChildren(); + ValidationUtils.checkArgument(fromChildren.size() == toChildren.size(), + "fromType [" + fromType + "] size: != toType [" + toType + "] size"); + + GenericRowData rowData = new GenericRowData(toType.getChildren().size()); + for (int i = 0; i < toChildren.size(); i++) { + Object fromVal = RowData.createFieldGetter(fromChildren.get(i), i).getFieldOrNull(row); + Object toVal; Review Comment: caution for the performance, because you are constructing the field getter for each row->row conversion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org