hudi-agent commented on code in PR #18877:
URL: https://github.com/apache/hudi/pull/18877#discussion_r3373483581
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java:
##########
@@ -141,21 +151,29 @@ private FieldWriter createWriter(LogicalType t) {
return new Timestamp96Writer(tsLtzPrecision);
}
Review Comment:
🤖 nit: the `resolvedSchema != null` guard here (and the `resolvedSchema ==
null ? null :` on the `createWriter` call a few lines down) is inconsistent
with the MAP and ROW branches, which access `resolvedSchema` directly without
any null check. Since `resolvedSchema = fieldSchema.getNonNullType()` and
`fieldSchema` is always non-null at every call site, `resolvedSchema` can never
be null — these guards read like defensive code that isn't actually defensive,
and could mislead a future reader into thinking this path is somehow different.
Could you drop the null checks to match the MAP/ROW style?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java:
##########
@@ -238,6 +280,97 @@ public static HoodieSchema convertToSchema(LogicalType
logicalType, String rowNa
return nullable ? HoodieSchema.createNullable(schema) : schema;
}
+ private static Map<String, Integer> parseVectorColumns(String vectorColumns)
{
+ Map<String, Integer> parsed = new LinkedHashMap<>();
+ for (String rawEntry : vectorColumns.split(",")) {
+ String entry = rawEntry.trim();
+ if (entry.isEmpty()) {
+ continue;
+ }
+ String[] parts = entry.split(":", -1);
+ if (parts.length > 2 || parts[0].trim().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Invalid VECTOR column descriptor '" + entry + "'. Expected
format: columnName[:dimension].");
+ }
+ String columnName = parts[0].trim();
+ String normalizedColumnName = columnName.toLowerCase(Locale.ROOT);
+ if (parsed.containsKey(normalizedColumnName)) {
+ throw new IllegalArgumentException("Duplicate VECTOR column descriptor
for column: " + columnName);
+ }
+ int dimension = DEFAULT_VECTOR_DIMENSION;
+ if (parts.length == 2) {
+ String dimensionText = parts[1].trim();
+ if (dimensionText.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Invalid VECTOR column descriptor '" + entry + "'. Dimension
must not be empty.");
+ }
+ try {
+ dimension = Integer.parseInt(dimensionText);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid VECTOR dimension for
column '" + columnName + "': " + dimensionText, e);
+ }
+ }
+ if (dimension <= 0) {
+ throw new IllegalArgumentException("VECTOR dimension must be positive
for column '" + columnName + "': " + dimension);
+ }
+ parsed.put(normalizedColumnName, dimension);
+ }
+ return parsed;
+ }
+
+ private static HoodieSchema convertVectorField(String fieldName, LogicalType
fieldType, int dimension) {
+ if (!(fieldType instanceof ArrayType)) {
+ throw new IllegalArgumentException(
+ "VECTOR column '" + fieldName + "' must be declared as ARRAY<FLOAT>,
ARRAY<DOUBLE>, or ARRAY<TINYINT>, but got: "
+ + fieldType.asSummaryString());
+ }
+ HoodieSchema.Vector.VectorElementType elementType =
inferVectorElementType(fieldName, ((ArrayType) fieldType).getElementType());
+ HoodieSchema vectorSchema = HoodieSchema.createVector(dimension,
elementType);
+ return fieldType.isNullable() ? HoodieSchema.createNullable(vectorSchema)
: vectorSchema;
+ }
+
+ private static HoodieSchema.Vector.VectorElementType
inferVectorElementType(String fieldName, LogicalType elementType) {
+ switch (elementType.getTypeRoot()) {
+ case FLOAT:
+ return HoodieSchema.Vector.VectorElementType.FLOAT;
+ case DOUBLE:
+ return HoodieSchema.Vector.VectorElementType.DOUBLE;
+ case TINYINT:
+ return HoodieSchema.Vector.VectorElementType.INT8;
+ default:
+ throw new IllegalArgumentException(
+ "VECTOR column '" + fieldName + "' must use ARRAY<FLOAT>,
ARRAY<DOUBLE>, or ARRAY<TINYINT>, but got ARRAY<"
+ + elementType.asSummaryString() + ">.");
+ }
+ }
+
+ /**
+ * Validates that all configured VECTOR column descriptors resolve to
top-level fields.
+ *
+ * <p>Callers that accept {@code hoodie.vector.columns} as a table option
should run this
+ * validation before schema inference so an unknown column is rejected
instead of being
+ * silently ignored by schema conversion.
+ *
+ * @param logicalType Flink logical type
+ * @param vectorColumnsStr comma-separated vector column descriptors, or
null/empty
+ */
+ public static void validateVectorColumns(LogicalType logicalType, String
vectorColumnsStr) {
+ if (vectorColumnsStr == null || vectorColumnsStr.trim().isEmpty()) {
+ return;
+ }
+ Map<String, Integer> vectorColumns = parseVectorColumns(vectorColumnsStr);
+ List<String> fieldNames = ((RowType) logicalType).getFieldNames();
+ List<String> normalizedFieldNames = fieldNames.stream()
+ .map(fieldName -> fieldName.toLowerCase(Locale.ROOT))
+ .collect(Collectors.toList());
Review Comment:
🤖 nit: `normalizedFieldNames` is only ever used for `.contains()` membership
checks on line 367 — collecting into a `Set` (`Collectors.toSet()`) would
signal that intent clearly and avoid the linear scan per lookup.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]