This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bd2ace52ad0 fix(flink): Support read non-VECTOR columns from table 
containing VEC… (#18712)
6bd2ace52ad0 is described below

commit 6bd2ace52ad0caea656831c589e9ca2b3d8e7eb4
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon May 11 13:54:16 2026 +0800

    fix(flink): Support read non-VECTOR columns from table containing VEC… 
(#18712)
    
    * fix(flink): Support read non-VECTOR columns from table containing VECTOR 
columns
---
 .../apache/hudi/util/HoodieSchemaConverter.java    |  23 +++++
 .../hudi/util/TestHoodieSchemaConverter.java       |  54 +++++++++++
 .../vector_cross_engine_validation/README.md       |  66 ++++++++++++++
 .../vector_cross_engine_validation/vector_cow.zip  | Bin 0 -> 80614 bytes
 .../vector_cross_engine_validation/vector_mor.zip  | Bin 0 -> 77871 bytes
 .../ITTestVectorCrossEngineCompatibility.java      | 101 +++++++++++++++++++++
 6 files changed, 244 insertions(+)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
index a79f19e0b782..c49142800b45 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
@@ -418,6 +418,8 @@ public class HoodieSchemaConverter {
         return convertUnion(hoodieSchema);
       case VARIANT:
         return convertVariant(hoodieSchema);
+      case VECTOR:
+        return convertVector(hoodieSchema);
       default:
         throw new IllegalArgumentException("Unsupported HoodieSchemaType: " + 
type);
     }
@@ -474,6 +476,27 @@ public class HoodieSchemaConverter {
     return DataTypes.TIME(flinkPrecision).notNull();
   }
 
+  private static DataType convertVector(HoodieSchema schema) {
+    if (!(schema instanceof HoodieSchema.Vector)) {
+      throw new IllegalStateException("Expected HoodieSchema.Vector but got: " 
+ schema.getClass());
+    }
+    HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) schema;
+    return 
DataTypes.ARRAY(convertVectorElementType(vectorSchema.getVectorElementType())).notNull();
+  }
+
+  private static DataType 
convertVectorElementType(HoodieSchema.Vector.VectorElementType elementType) {
+    switch (elementType) {
+      case FLOAT:
+        return DataTypes.FLOAT().notNull();
+      case DOUBLE:
+        return DataTypes.DOUBLE().notNull();
+      case INT8:
+        return DataTypes.TINYINT().notNull();
+      default:
+        throw new IllegalArgumentException("Unsupported VECTOR element type: " 
+ elementType);
+    }
+  }
+
   private static DataType createBlob() {
     // Create nested reference ROW type
     DataType referenceType = DataTypes.ROW(
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
index feddf10a258b..51b8a15f4c7b 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
@@ -41,6 +42,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -476,6 +478,58 @@ public class TestHoodieSchemaConverter {
     assertTrue(dataType.getLogicalType() instanceof VarBinaryType);
   }
 
+  @Test
+  public void testVectorConversion() {
+    HoodieSchema floatVectorSchema = HoodieSchema.createVector(128);
+    HoodieSchema doubleVectorSchema = HoodieSchema.createVector(128, 
HoodieSchema.Vector.VectorElementType.DOUBLE);
+    HoodieSchema int8VectorSchema = HoodieSchema.createVector(128, 
HoodieSchema.Vector.VectorElementType.INT8);
+
+    DataType floatDataType = 
HoodieSchemaConverter.convertToDataType(floatVectorSchema);
+    DataType doubleDataType = 
HoodieSchemaConverter.convertToDataType(doubleVectorSchema);
+    DataType int8DataType = 
HoodieSchemaConverter.convertToDataType(int8VectorSchema);
+
+    assertVectorArray(floatDataType, LogicalTypeRoot.FLOAT, false);
+    assertVectorArray(doubleDataType, LogicalTypeRoot.DOUBLE, false);
+    assertVectorArray(int8DataType, LogicalTypeRoot.TINYINT, false);
+  }
+
+  @Test
+  public void testNullableVectorConversion() {
+    HoodieSchema vectorSchema = 
HoodieSchema.createNullable(HoodieSchema.createVector(128));
+
+    DataType dataType = HoodieSchemaConverter.convertToDataType(vectorSchema);
+
+    assertVectorArray(dataType, LogicalTypeRoot.FLOAT, true);
+  }
+
+  @Test
+  public void testVectorInRecordConversion() {
+    HoodieSchema schema = HoodieSchema.createRecord(
+        "test_record",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.INT)),
+            HoodieSchemaField.of("embedding", HoodieSchema.createVector(128))
+        )
+    );
+
+    RowType rowType = HoodieSchemaConverter.convertToRowType(schema);
+
+    assertEquals(2, rowType.getFieldCount());
+    assertEquals("embedding", rowType.getFieldNames().get(1));
+    ArrayType vectorArrayType = assertInstanceOf(ArrayType.class, 
rowType.getTypeAt(1));
+    assertEquals(LogicalTypeRoot.FLOAT, 
vectorArrayType.getElementType().getTypeRoot());
+    assertFalse(rowType.getTypeAt(1).isNullable());
+  }
+
+  private void assertVectorArray(DataType dataType, LogicalTypeRoot 
elementTypeRoot, boolean nullable) {
+    ArrayType arrayType = assertInstanceOf(ArrayType.class, 
dataType.getLogicalType());
+    assertEquals(elementTypeRoot, arrayType.getElementType().getTypeRoot());
+    assertFalse(arrayType.getElementType().isNullable());
+    assertEquals(nullable, dataType.getLogicalType().isNullable());
+  }
+
   @Test
   void testUnionSchemaWithMultipleRecordTypes() {
     HoodieSchema schema = 
HoodieSchema.fromAvroSchema(HoodieMetadataRecord.SCHEMA$);
diff --git 
a/hudi-common/src/test/resources/vector_cross_engine_validation/README.md 
b/hudi-common/src/test/resources/vector_cross_engine_validation/README.md
new file mode 100644
index 000000000000..1d6472fa0bf1
--- /dev/null
+++ b/hudi-common/src/test/resources/vector_cross_engine_validation/README.md
@@ -0,0 +1,66 @@
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# Generation of test files
+
+Spark3.5 is used to generate these test files.
+
+Table schema:
+* MOR table
+```sql
+CREATE TABLE vector_table_mor (
+    id BIGINT,
+    name STRING,
+    embedding1 VECTOR(128) COMMENT 'document float embedding',
+    embedding2 VECTOR(128, DOUBLE) COMMENT 'document double embedding',
+    embedding3 VECTOR(128, INT8) COMMENT 'document INT8 embedding',
+    ts BIGINT
+) USING hudi
+LOCATION '/tmp/hudi_vector_table_mor'
+TBLPROPERTIES (
+    primaryKey = 'id',
+    preCombineField = 'ts',
+    type = 'mor',
+    hoodie.index.type = 'INMEMORY'
+);
+```
+
+* COW table
+```sql
+CREATE TABLE vector_table_cow (
+    id BIGINT,
+    name STRING,
+    embedding1 VECTOR(128) COMMENT 'document float embedding',
+    embedding2 VECTOR(128, DOUBLE) COMMENT 'document double embedding',
+    embedding3 VECTOR(128, INT8) COMMENT 'document INT8 embedding',
+    ts BIGINT
+) USING hudi
+LOCATION '/tmp/hudi_vector_table_mor'
+TBLPROPERTIES (
+    primaryKey = 'id',
+    preCombineField = 'ts',
+    type = 'cow'
+);
+```
+
+The shell commands used to generate this are:
+
+```shell
+cd /path/to/test/files/
+zip -r $TABLE_DIR_NAME.zip $TABLE_DIR_NAME
+```
diff --git 
a/hudi-common/src/test/resources/vector_cross_engine_validation/vector_cow.zip 
b/hudi-common/src/test/resources/vector_cross_engine_validation/vector_cow.zip
new file mode 100644
index 000000000000..a1afed0ea6e1
Binary files /dev/null and 
b/hudi-common/src/test/resources/vector_cross_engine_validation/vector_cow.zip 
differ
diff --git 
a/hudi-common/src/test/resources/vector_cross_engine_validation/vector_mor.zip 
b/hudi-common/src/test/resources/vector_cross_engine_validation/vector_mor.zip
new file mode 100644
index 000000000000..b89c2474a5e2
Binary files /dev/null and 
b/hudi-common/src/test/resources/vector_cross_engine_validation/vector_mor.zip 
differ
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorCrossEngineCompatibility.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorCrossEngineCompatibility.java
new file mode 100644
index 000000000000..d45b44918014
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorCrossEngineCompatibility.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.utils.FlinkMiniCluster;
+import org.apache.hudi.utils.TestTableEnvs;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.nio.file.Path;
+import java.util.List;
+
+import static org.apache.hudi.utils.TestData.assertRowsEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Integration test for cross-engine compatibility - verifying that Flink can 
read tables with VECTOR columns written by Spark.
+ */
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestVectorCrossEngineCompatibility {
+  @TempDir
+  Path tempDir;
+
+  private void createTable(TableEnvironment tableEnv, String tablePath, String 
tableType) throws Exception {
+    // Create a Hudi table pointing to the Spark-written data
+    // In Flink, VECTOR is represented as ARRAY<FLOAT/DOUBLE/TINYINT>
+    String createTableDdl = String.format(
+        "CREATE TABLE vector_table (\n"
+            + "  id BIGINT,\n"
+            + "  name STRING,\n"
+            + "  embedding1 ARRAY<FLOAT>,\n"
+            + "  embedding2 ARRAY<DOUBLE>,\n"
+            + "  embedding3 ARRAY<TINYINT>,\n"
+            + "  ts BIGINT,\n"
+            + "  PRIMARY KEY (id) NOT ENFORCED\n"
+            + ") WITH (\n"
+            + "  'connector' = 'hudi',\n"
+            + "  'path' = '%s',\n"
+            + "  'table.type' = '%s',\n"
+            + "  'ordering.fields' = 'ts'\n"
+            + ");",
+        tablePath, tableType);
+    tableEnv.executeSql(createTableDdl);
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  public void testValidationVectorColumnsFromCOWTable(HoodieTableType 
tableType) throws Exception {
+    // Validate exception will be thrown when reading VECTOR columns from a 
table with VECTOR columns
+    Path cowTargetDir = tempDir.resolve("table");
+    String resourceName = tableType == HoodieTableType.MERGE_ON_READ ? 
"vector_mor" : "vector_cow";
+    
HoodieTestUtils.extractZipToDirectory(String.format("vector_cross_engine_validation/%s.zip",
 resourceName), cowTargetDir, getClass());
+    String cowPath = cowTargetDir.resolve(resourceName).toString();
+    TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+    createTable(tableEnv, cowPath, tableType.name());
+
+    // ValidationException expects to be thrown
+    assertThrows(RuntimeException.class,
+        () -> CollectionUtil.iteratorToList(tableEnv.executeSql("select * from 
vector_table").collect()),
+        "Unexpected type exception. Primitive type: FIXED_LEN_BYTE_ARRAY. 
Field type: FLOAT. Field name: embedding1");
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  public void testReadNonVectorColumnsFromCOWTable(HoodieTableType tableType) 
throws Exception {
+    // Test that Flink can read non-VECTOR columns from a table with VECTOR 
columns
+    Path cowTargetDir = tempDir.resolve("table");
+    String resourceName = tableType == HoodieTableType.MERGE_ON_READ ? 
"vector_mor" : "vector_cow";
+    
HoodieTestUtils.extractZipToDirectory(String.format("vector_cross_engine_validation/%s.zip",
 resourceName), cowTargetDir, getClass());
+    String cowPath = cowTargetDir.resolve(resourceName).toString();
+    TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+    createTable(tableEnv, cowPath, tableType.name());
+
+    List<Row> rows = CollectionUtil.iteratorToList(tableEnv.executeSql("select 
id, name, ts from vector_table").collect());
+    assertRowsEquals(rows, "[+I[1, doc-1, 1000], +I[2, doc-2, 2000]]");
+  }
+}

Reply via email to