[GitHub] [hudi] danny0405 commented on a diff in pull request #9133: [HUDI-6474] Added support for reading tables evolved using comprehensive schema e…
danny0405 commented on code in PR #9133: URL: https://github.com/apache/hudi/pull/9133#discussion_r1257689584 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java: ## @@ -165,21 +192,132 @@ void add(int pos, LogicalType fromType, LogicalType toType) { } break; } + case ARRAY: { +if (from == ARRAY) { + LogicalType fromElementType = fromType.getChildren().get(0); + LogicalType toElementType = toType.getChildren().get(0); + return array -> doArrayConversion((ArrayData) array, fromElementType, toElementType); +} +break; + } + case MAP: { +if (from == MAP) { + return map -> doMapConversion((MapData) map, fromType, toType); +} +break; + } + case ROW: { +if (from == ROW) { + // Assumption: InternalSchemaManager should produce a cast that is of the same size + return row -> doRowConversion((RowData) row, fromType, toType); +} +break; + } default: } -return null; +throw new IllegalArgumentException(String.format("Unsupported conversion for %s => %s", fromType, toType)); } - private void add(int pos, Cast cast) { -castMap.put(pos, cast); + /** + * Helper function to perform convert an arrayData from one LogicalType to another. + * + * @param arrayNon-null array data to be converted; however array-elements are allowed to be null + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted array that has the structure/specifications of that defined by the output LogicalType + */ + private static ArrayData doArrayConversion(@Nonnull ArrayData array, LogicalType fromType, LogicalType toType) { +// using Object type here as primitives are not allowed to be null +Object[] objects = new Object[array.size()]; +for (int i = 0; i < array.size(); i++) { + Object fromObject = ArrayData.createElementGetter(fromType).getElementOrNull(array, i); + // need to handle nulls to prevent NullPointerException in #getConversion() + Object toObject = fromObject != null ? getConversion(fromType, toType).apply(fromObject) : null; + objects[i] = toObject; +} +return new GenericArrayData(objects); + } + + /** + * Helper function to perform convert a MapData from one LogicalType to another. + * + * @param map Non-null map data to be converted; however, values are allowed to be null + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted map that has the structure/specifications of that defined by the output LogicalType + */ + private static MapData doMapConversion(@Nonnull MapData map, LogicalType fromType, LogicalType toType) { +// no schema evolution is allowed on the keyType, hence, we only need to care about the valueType +LogicalType fromValueType = fromType.getChildren().get(1); +LogicalType toValueType = toType.getChildren().get(1); +LogicalType keyType = fromType.getChildren().get(0); + +final Map result = new HashMap<>(); +for (int i = 0; i < map.size(); i++) { + Object keyObject = ArrayData.createElementGetter(keyType).getElementOrNull(map.keyArray(), i); + Object fromObject = ArrayData.createElementGetter(fromValueType).getElementOrNull(map.valueArray(), i); + // need to handle nulls to prevent NullPointerException in #getConversion() + Object toObject = fromObject != null ? getConversion(fromValueType, toValueType).apply(fromObject) : null; + result.put(keyObject, toObject); +} +return new GenericMapData(result); + } + + /** + * Helper function to perform convert a RowData from one LogicalType to another. + * + * @param row Non-null row data to be converted; however, fields might contain nulls + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted row that has the structure/specifications of that defined by the output LogicalType + */ + private static RowData doRowConversion(@Nonnull RowData row, LogicalType fromType, LogicalType toType) { +// note: InternalSchema.merge guarantees that the schema to be read fromType is orientated in the same order as toType +// hence, we can match types by position as it is guaranteed that it is referencing the same field +List fromChildren = fromType.getChildren(); +List toChildren = toType.getChildren(); +ValidationUtils.checkArgument(fromChildren.size() == toChildren.size(), +"fromType [" + fromType + "] size: != toType [" + toType + "] size"); + +
[GitHub] [hudi] danny0405 commented on a diff in pull request #9133: [HUDI-6474] Added support for reading tables evolved using comprehensive schema e…
danny0405 commented on code in PR #9133: URL: https://github.com/apache/hudi/pull/9133#discussion_r1255288699 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java: ## @@ -165,21 +192,132 @@ void add(int pos, LogicalType fromType, LogicalType toType) { } break; } + case ARRAY: { +if (from == ARRAY) { + LogicalType fromElementType = fromType.getChildren().get(0); + LogicalType toElementType = toType.getChildren().get(0); + return array -> doArrayConversion((ArrayData) array, fromElementType, toElementType); +} +break; + } + case MAP: { +if (from == MAP) { + return map -> doMapConversion((MapData) map, fromType, toType); +} +break; + } + case ROW: { +if (from == ROW) { + // Assumption: InternalSchemaManager should produce a cast that is of the same size + return row -> doRowConversion((RowData) row, fromType, toType); +} +break; + } default: } -return null; +throw new IllegalArgumentException(String.format("Unsupported conversion for %s => %s", fromType, toType)); Review Comment: Do not throw `RuntimeException` in nested calling code path, it is very obscure for the invoker to get the perception of exceptions. Either throws a checked exception or return null as of before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #9133: [HUDI-6474] Added support for reading tables evolved using comprehensive schema e…
danny0405 commented on code in PR #9133: URL: https://github.com/apache/hudi/pull/9133#discussion_r1255291445 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java: ## @@ -165,21 +192,132 @@ void add(int pos, LogicalType fromType, LogicalType toType) { } break; } + case ARRAY: { +if (from == ARRAY) { + LogicalType fromElementType = fromType.getChildren().get(0); + LogicalType toElementType = toType.getChildren().get(0); + return array -> doArrayConversion((ArrayData) array, fromElementType, toElementType); +} +break; + } + case MAP: { +if (from == MAP) { + return map -> doMapConversion((MapData) map, fromType, toType); +} +break; + } + case ROW: { +if (from == ROW) { + // Assumption: InternalSchemaManager should produce a cast that is of the same size + return row -> doRowConversion((RowData) row, fromType, toType); +} +break; + } default: } -return null; +throw new IllegalArgumentException(String.format("Unsupported conversion for %s => %s", fromType, toType)); } - private void add(int pos, Cast cast) { -castMap.put(pos, cast); + /** + * Helper function to perform convert an arrayData from one LogicalType to another. + * + * @param arrayNon-null array data to be converted; however array-elements are allowed to be null + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted array that has the structure/specifications of that defined by the output LogicalType + */ + private static ArrayData doArrayConversion(@Nonnull ArrayData array, LogicalType fromType, LogicalType toType) { +// using Object type here as primitives are not allowed to be null +Object[] objects = new Object[array.size()]; +for (int i = 0; i < array.size(); i++) { + Object fromObject = ArrayData.createElementGetter(fromType).getElementOrNull(array, i); + // need to handle nulls to prevent NullPointerException in #getConversion() + Object toObject = fromObject != null ? getConversion(fromType, toType).apply(fromObject) : null; + objects[i] = toObject; +} +return new GenericArrayData(objects); + } + + /** + * Helper function to perform convert a MapData from one LogicalType to another. + * + * @param map Non-null map data to be converted; however, values are allowed to be null + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted map that has the structure/specifications of that defined by the output LogicalType + */ + private static MapData doMapConversion(@Nonnull MapData map, LogicalType fromType, LogicalType toType) { +// no schema evolution is allowed on the keyType, hence, we only need to care about the valueType +LogicalType fromValueType = fromType.getChildren().get(1); +LogicalType toValueType = toType.getChildren().get(1); +LogicalType keyType = fromType.getChildren().get(0); + +final Map result = new HashMap<>(); +for (int i = 0; i < map.size(); i++) { + Object keyObject = ArrayData.createElementGetter(keyType).getElementOrNull(map.keyArray(), i); + Object fromObject = ArrayData.createElementGetter(fromValueType).getElementOrNull(map.valueArray(), i); + // need to handle nulls to prevent NullPointerException in #getConversion() + Object toObject = fromObject != null ? getConversion(fromValueType, toValueType).apply(fromObject) : null; + result.put(keyObject, toObject); +} +return new GenericMapData(result); + } + + /** + * Helper function to perform convert a RowData from one LogicalType to another. + * + * @param row Non-null row data to be converted; however, fields might contain nulls + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted row that has the structure/specifications of that defined by the output LogicalType + */ + private static RowData doRowConversion(@Nonnull RowData row, LogicalType fromType, LogicalType toType) { +// note: InternalSchema.merge guarantees that the schema to be read fromType is orientated in the same order as toType +// hence, we can match types by position as it is guaranteed that it is referencing the same field +List fromChildren = fromType.getChildren(); +List toChildren = toType.getChildren(); +ValidationUtils.checkArgument(fromChildren.size() == toChildren.size(), +"fromType [" + fromType + "] size: != toType [" + toType + "] size"); + +
[GitHub] [hudi] danny0405 commented on a diff in pull request #9133: [HUDI-6474] Added support for reading tables evolved using comprehensive schema e…
danny0405 commented on code in PR #9133: URL: https://github.com/apache/hudi/pull/9133#discussion_r1255291076 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java: ## @@ -165,21 +192,132 @@ void add(int pos, LogicalType fromType, LogicalType toType) { } break; } + case ARRAY: { +if (from == ARRAY) { + LogicalType fromElementType = fromType.getChildren().get(0); + LogicalType toElementType = toType.getChildren().get(0); + return array -> doArrayConversion((ArrayData) array, fromElementType, toElementType); +} +break; + } + case MAP: { +if (from == MAP) { + return map -> doMapConversion((MapData) map, fromType, toType); +} +break; + } + case ROW: { +if (from == ROW) { + // Assumption: InternalSchemaManager should produce a cast that is of the same size + return row -> doRowConversion((RowData) row, fromType, toType); +} +break; + } default: } -return null; +throw new IllegalArgumentException(String.format("Unsupported conversion for %s => %s", fromType, toType)); } - private void add(int pos, Cast cast) { -castMap.put(pos, cast); + /** + * Helper function to perform convert an arrayData from one LogicalType to another. + * + * @param arrayNon-null array data to be converted; however array-elements are allowed to be null + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted array that has the structure/specifications of that defined by the output LogicalType + */ + private static ArrayData doArrayConversion(@Nonnull ArrayData array, LogicalType fromType, LogicalType toType) { +// using Object type here as primitives are not allowed to be null +Object[] objects = new Object[array.size()]; +for (int i = 0; i < array.size(); i++) { + Object fromObject = ArrayData.createElementGetter(fromType).getElementOrNull(array, i); + // need to handle nulls to prevent NullPointerException in #getConversion() + Object toObject = fromObject != null ? getConversion(fromType, toType).apply(fromObject) : null; + objects[i] = toObject; +} +return new GenericArrayData(objects); + } + + /** + * Helper function to perform convert a MapData from one LogicalType to another. + * + * @param map Non-null map data to be converted; however, values are allowed to be null + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted map that has the structure/specifications of that defined by the output LogicalType + */ + private static MapData doMapConversion(@Nonnull MapData map, LogicalType fromType, LogicalType toType) { +// no schema evolution is allowed on the keyType, hence, we only need to care about the valueType +LogicalType fromValueType = fromType.getChildren().get(1); +LogicalType toValueType = toType.getChildren().get(1); +LogicalType keyType = fromType.getChildren().get(0); + +final Map result = new HashMap<>(); +for (int i = 0; i < map.size(); i++) { + Object keyObject = ArrayData.createElementGetter(keyType).getElementOrNull(map.keyArray(), i); + Object fromObject = ArrayData.createElementGetter(fromValueType).getElementOrNull(map.valueArray(), i); + // need to handle nulls to prevent NullPointerException in #getConversion() + Object toObject = fromObject != null ? getConversion(fromValueType, toValueType).apply(fromObject) : null; + result.put(keyObject, toObject); +} +return new GenericMapData(result); + } + + /** + * Helper function to perform convert a RowData from one LogicalType to another. + * + * @param row Non-null row data to be converted; however, fields might contain nulls + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted row that has the structure/specifications of that defined by the output LogicalType + */ + private static RowData doRowConversion(@Nonnull RowData row, LogicalType fromType, LogicalType toType) { +// note: InternalSchema.merge guarantees that the schema to be read fromType is orientated in the same order as toType +// hence, we can match types by position as it is guaranteed that it is referencing the same field +List fromChildren = fromType.getChildren(); +List toChildren = toType.getChildren(); +ValidationUtils.checkArgument(fromChildren.size() == toChildren.size(), +"fromType [" + fromType + "] size: != toType [" + toType + "] size"); + +
[GitHub] [hudi] danny0405 commented on a diff in pull request #9133: [HUDI-6474] Added support for reading tables evolved using comprehensive schema e…
danny0405 commented on code in PR #9133: URL: https://github.com/apache/hudi/pull/9133#discussion_r1255289246 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java: ## @@ -165,21 +192,132 @@ void add(int pos, LogicalType fromType, LogicalType toType) { } break; } + case ARRAY: { +if (from == ARRAY) { + LogicalType fromElementType = fromType.getChildren().get(0); + LogicalType toElementType = toType.getChildren().get(0); + return array -> doArrayConversion((ArrayData) array, fromElementType, toElementType); +} +break; + } + case MAP: { +if (from == MAP) { + return map -> doMapConversion((MapData) map, fromType, toType); +} +break; + } + case ROW: { +if (from == ROW) { + // Assumption: InternalSchemaManager should produce a cast that is of the same size + return row -> doRowConversion((RowData) row, fromType, toType); +} +break; + } default: } -return null; +throw new IllegalArgumentException(String.format("Unsupported conversion for %s => %s", fromType, toType)); } - private void add(int pos, Cast cast) { -castMap.put(pos, cast); + /** + * Helper function to perform convert an arrayData from one LogicalType to another. + * + * @param arrayNon-null array data to be converted; however array-elements are allowed to be null + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted array that has the structure/specifications of that defined by the output LogicalType + */ + private static ArrayData doArrayConversion(@Nonnull ArrayData array, LogicalType fromType, LogicalType toType) { +// using Object type here as primitives are not allowed to be null +Object[] objects = new Object[array.size()]; +for (int i = 0; i < array.size(); i++) { + Object fromObject = ArrayData.createElementGetter(fromType).getElementOrNull(array, i); + // need to handle nulls to prevent NullPointerException in #getConversion() Review Comment: No need to create element getter for each step of the for-loop? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #9133: [HUDI-6474] Added support for reading tables evolved using comprehensive schema e…
danny0405 commented on code in PR #9133: URL: https://github.com/apache/hudi/pull/9133#discussion_r1255289376 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java: ## @@ -165,21 +192,132 @@ void add(int pos, LogicalType fromType, LogicalType toType) { } break; } + case ARRAY: { +if (from == ARRAY) { + LogicalType fromElementType = fromType.getChildren().get(0); + LogicalType toElementType = toType.getChildren().get(0); + return array -> doArrayConversion((ArrayData) array, fromElementType, toElementType); +} +break; + } + case MAP: { +if (from == MAP) { + return map -> doMapConversion((MapData) map, fromType, toType); +} +break; + } + case ROW: { +if (from == ROW) { + // Assumption: InternalSchemaManager should produce a cast that is of the same size + return row -> doRowConversion((RowData) row, fromType, toType); +} +break; + } default: } -return null; +throw new IllegalArgumentException(String.format("Unsupported conversion for %s => %s", fromType, toType)); } - private void add(int pos, Cast cast) { -castMap.put(pos, cast); + /** + * Helper function to perform convert an arrayData from one LogicalType to another. + * + * @param arrayNon-null array data to be converted; however array-elements are allowed to be null + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted array that has the structure/specifications of that defined by the output LogicalType + */ + private static ArrayData doArrayConversion(@Nonnull ArrayData array, LogicalType fromType, LogicalType toType) { +// using Object type here as primitives are not allowed to be null +Object[] objects = new Object[array.size()]; +for (int i = 0; i < array.size(); i++) { + Object fromObject = ArrayData.createElementGetter(fromType).getElementOrNull(array, i); + // need to handle nulls to prevent NullPointerException in #getConversion() + Object toObject = fromObject != null ? getConversion(fromType, toType).apply(fromObject) : null; + objects[i] = toObject; +} +return new GenericArrayData(objects); + } + + /** + * Helper function to perform convert a MapData from one LogicalType to another. + * + * @param map Non-null map data to be converted; however, values are allowed to be null + * @param fromType The input LogicalType of the row data to be converted from + * @param toType The output LogicalType of the row data to be converted to + * @return Converted map that has the structure/specifications of that defined by the output LogicalType + */ + private static MapData doMapConversion(@Nonnull MapData map, LogicalType fromType, LogicalType toType) { +// no schema evolution is allowed on the keyType, hence, we only need to care about the valueType +LogicalType fromValueType = fromType.getChildren().get(1); +LogicalType toValueType = toType.getChildren().get(1); +LogicalType keyType = fromType.getChildren().get(0); + +final Map result = new HashMap<>(); +for (int i = 0; i < map.size(); i++) { + Object keyObject = ArrayData.createElementGetter(keyType).getElementOrNull(map.keyArray(), i); + Object fromObject = ArrayData.createElementGetter(fromValueType).getElementOrNull(map.valueArray(), i); Review Comment: ditto ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java: ## @@ -165,21 +192,132 @@ void add(int pos, LogicalType fromType, LogicalType toType) { } break; } + case ARRAY: { +if (from == ARRAY) { + LogicalType fromElementType = fromType.getChildren().get(0); + LogicalType toElementType = toType.getChildren().get(0); + return array -> doArrayConversion((ArrayData) array, fromElementType, toElementType); +} +break; + } + case MAP: { +if (from == MAP) { + return map -> doMapConversion((MapData) map, fromType, toType); +} +break; + } + case ROW: { +if (from == ROW) { + // Assumption: InternalSchemaManager should produce a cast that is of the same size + return row -> doRowConversion((RowData) row, fromType, toType); +} +break; + } default: } -return null; +throw new IllegalArgumentException(String.format("Unsupported conversion for %s => %s", fromType, toType)); } - private void add(int pos, Cast cast) { -castMap.put(pos, cast); + /** + * Helper function to perform convert an arrayData from one LogicalType to another. + * + * @param arrayNon-null
[GitHub] [hudi] danny0405 commented on a diff in pull request #9133: [HUDI-6474] Added support for reading tables evolved using comprehensive schema e…
danny0405 commented on code in PR #9133: URL: https://github.com/apache/hudi/pull/9133#discussion_r1255288699 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java: ## @@ -165,21 +192,132 @@ void add(int pos, LogicalType fromType, LogicalType toType) { } break; } + case ARRAY: { +if (from == ARRAY) { + LogicalType fromElementType = fromType.getChildren().get(0); + LogicalType toElementType = toType.getChildren().get(0); + return array -> doArrayConversion((ArrayData) array, fromElementType, toElementType); +} +break; + } + case MAP: { +if (from == MAP) { + return map -> doMapConversion((MapData) map, fromType, toType); +} +break; + } + case ROW: { +if (from == ROW) { + // Assumption: InternalSchemaManager should produce a cast that is of the same size + return row -> doRowConversion((RowData) row, fromType, toType); +} +break; + } default: } -return null; +throw new IllegalArgumentException(String.format("Unsupported conversion for %s => %s", fromType, toType)); Review Comment: Do not throws `RuntimeException` in nested calling code path, it is very obstacle for the invoker to get the perception. Either throws a checked exception or return null as of before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org