hudi-agent commented on code in PR #18877:
URL: https://github.com/apache/hudi/pull/18877#discussion_r3371883511


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/DefaultCleanHandler.java:
##########
@@ -111,6 +111,6 @@ public void close() {
         throw new HoodieException("Failed to close executor of clean 
handler.", e);
       }
     }
-    this.writeClient.clean();
+    this.writeClient.close();

Review Comment:
   🤖 This change from `writeClient.clean()` to `writeClient.close()` looks 
unrelated to vector-writer support, and it's a real behavior change: the 
handler used to trigger one final synchronous `clean()` during shutdown, but it 
will now skip that and instead close the underlying write client (which 
`CleanFunction.close()` previously did not do). Was this an intentional 
resource-leak fix that got pulled in here, or accidental? If intentional, could 
it be split into its own commit/PR so the motivation is visible? @danny0405 
might want to take a quick look as well.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java:
##########
@@ -97,7 +106,8 @@ public void write(final RowData record) {
     recordConsumer.endMessage();
   }
 
-  private FieldWriter createWriter(LogicalType t) {
+  private FieldWriter createWriter(LogicalType t, HoodieSchema hoodieSchema) {

Review Comment:
   🤖 nit: could you rename the second parameter from `hoodieSchema` to 
`fieldSchema` (or `columnSchema`)? Inside this method it holds the schema for 
the single field being written, but `hoodieSchema` reads like the root record 
schema — same name as the outer class's constructor parameter — which could 
mislead a future reader into thinking it carries top-level record context.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java:
##########
@@ -238,6 +280,97 @@ public static HoodieSchema convertToSchema(LogicalType 
logicalType, String rowNa
     return nullable ? HoodieSchema.createNullable(schema) : schema;
   }
 
+  private static Map<String, Integer> parseVectorColumns(String vectorColumns) 
{
+    Map<String, Integer> parsed = new LinkedHashMap<>();
+    for (String rawEntry : vectorColumns.split(",")) {
+      String entry = rawEntry.trim();
+      if (entry.isEmpty()) {
+        continue;
+      }
+      String[] parts = entry.split(":", -1);
+      if (parts.length > 2 || parts[0].trim().isEmpty()) {
+        throw new IllegalArgumentException(
+            "Invalid VECTOR column descriptor '" + entry + "'. Expected 
format: columnName[:dimension].");
+      }
+      String columnName = parts[0].trim();
+      String normalizedColumnName = columnName.toLowerCase(Locale.ROOT);
+      if (parsed.containsKey(normalizedColumnName)) {
+        throw new IllegalArgumentException("Duplicate VECTOR column descriptor 
for column: " + columnName);
+      }
+      int dimension = DEFAULT_VECTOR_DIMENSION;
+      if (parts.length == 2) {
+        String dimensionText = parts[1].trim();
+        if (dimensionText.isEmpty()) {
+          throw new IllegalArgumentException(
+              "Invalid VECTOR column descriptor '" + entry + "'. Dimension 
must not be empty.");
+        }
+        try {
+          dimension = Integer.parseInt(dimensionText);
+        } catch (NumberFormatException e) {
+          throw new IllegalArgumentException("Invalid VECTOR dimension for 
column '" + columnName + "': " + dimensionText, e);
+        }
+      }
+      if (dimension <= 0) {
+        throw new IllegalArgumentException("VECTOR dimension must be positive 
for column '" + columnName + "': " + dimension);
+      }
+      parsed.put(normalizedColumnName, dimension);
+    }
+    return parsed;
+  }
+
+  private static HoodieSchema convertVectorField(String fieldName, LogicalType 
fieldType, int dimension) {
+    if (!(fieldType instanceof ArrayType)) {
+      throw new IllegalArgumentException(
+          "VECTOR column '" + fieldName + "' must be declared as ARRAY<FLOAT>, 
ARRAY<DOUBLE>, or ARRAY<TINYINT>, but got: "
+              + fieldType.asSummaryString());
+    }
+    HoodieSchema.Vector.VectorElementType elementType = 
inferVectorElementType(fieldName, ((ArrayType) fieldType).getElementType());
+    HoodieSchema vectorSchema = HoodieSchema.createVector(dimension, 
elementType);
+    return fieldType.isNullable() ? HoodieSchema.createNullable(vectorSchema) 
: vectorSchema;
+  }
+
+  private static HoodieSchema.Vector.VectorElementType 
inferVectorElementType(String fieldName, LogicalType elementType) {
+    switch (elementType.getTypeRoot()) {
+      case FLOAT:
+        return HoodieSchema.Vector.VectorElementType.FLOAT;
+      case DOUBLE:
+        return HoodieSchema.Vector.VectorElementType.DOUBLE;
+      case TINYINT:
+        return HoodieSchema.Vector.VectorElementType.INT8;
+      default:
+        throw new IllegalArgumentException(
+            "VECTOR column '" + fieldName + "' must use ARRAY<FLOAT>, 
ARRAY<DOUBLE>, or ARRAY<TINYINT>, but got ARRAY<"
+                + elementType.asSummaryString() + ">.");
+    }
+  }
+
+  /**
+   * Validates that all configured VECTOR column descriptors resolve to 
top-level fields.
+   *
+   * <p>Callers that accept {@code hoodie.vector.columns} as a table option 
should run this
+   * validation before schema inference so an unknown column is rejected 
instead of being
+   * silently ignored by schema conversion.
+   *
+   * @param logicalType      Flink logical type
+   * @param vectorColumnsStr comma-separated vector column descriptors, or 
null/empty
+   */
+  public static void validateVectorColumns(LogicalType logicalType, String 
vectorColumnsStr) {
+    if (vectorColumnsStr == null || vectorColumnsStr.trim().isEmpty()) {
+      return;
+    }
+    Map<String, Integer> vectorColumns = parseVectorColumns(vectorColumnsStr);
+    List<String> fieldNames = ((RowType) logicalType).getFieldNames();
+    List<String> normalizedFieldNames = fieldNames.stream()
+        .map(fieldName -> fieldName.toLowerCase(Locale.ROOT))
+        .collect(Collectors.toList());
+    vectorColumns.keySet().stream()
+        .filter(vectorColumn -> !normalizedFieldNames.contains(vectorColumn))
+        .findFirst()
+        .ifPresent(vectorColumn -> {

Review Comment:
   🤖 nit: using `ifPresent` to throw an exception is a bit surprising here — 
`ifPresent` signals an optional side-effect, but throwing is an unconditional 
failure path. Have you considered a plain `Optional<String> unknown = ...; if 
(unknown.isPresent()) { throw new IllegalArgumentException(...); }` instead? It 
reads more naturally for a validation method.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to