This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d6f0c11dbbb5 feat(flink): Support reading and writing VECTOR fields in
clustering (#18913)
d6f0c11dbbb5 is described below
commit d6f0c11dbbb582225cd77329666ce3b68fc002be
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon Jun 22 12:58:24 2026 +0800
feat(flink): Support reading and writing VECTOR fields in clustering
(#18913)
---
.../row/parquet/ParquetSchemaConverter.java | 14 +-
.../row/parquet/TestParquetSchemaConverter.java | 14 ++
.../hudi/sink/clustering/ClusteringOperator.java | 5 +-
.../apache/hudi/table/ITTestVectorDataSource.java | 203 +++++++++++++++++++--
4 files changed, 221 insertions(+), 15 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index 5bad03d9253e..5efe4aeee62c 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -148,9 +148,17 @@ public class ParquetSchemaConverter {
dataType = DataTypes.of(new TimestampType(9));
break;
case FIXED_LEN_BYTE_ARRAY:
- LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
- (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
- dataType = DataTypes.of(new DecimalType(decimalType.getPrecision(),
decimalType.getScale()));
+ if (logicalType instanceof
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
+ (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation)
logicalType;
+ dataType = DataTypes.of(new
DecimalType(decimalType.getPrecision(), decimalType.getScale()));
+ } else {
+ // VECTOR columns are stored as bare FIXED_LEN_BYTE_ARRAY without
a Parquet logical type annotation,
+ // HoodieParquetFileFormatHelper#buildImplicitSchemaChangeInfo.
+ // Treat the physical type as bytes here; vector semantics are
restored
+ // from the schema/footer metadata by the vector-aware read path.
+ dataType = DataTypes.BYTES();
+ }
break;
default:
throw new UnsupportedOperationException("Unsupported type: " +
parquetType);
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
index 66c6f9e86263..23c2d6dac56e 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -241,6 +241,20 @@ public class TestParquetSchemaConverter {
assertEquals(16, featuresType.getTypeLength());
}
+ @Test
+ void testUnannotatedFixedLenByteArrayConvertsToBytes() {
+ MessageType messageType = new MessageType(
+ "test",
+ Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
Type.Repetition.REQUIRED).named("id"),
+ Types.primitive(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
Type.Repetition.OPTIONAL)
+ .length(8)
+ .named("embedding"));
+
+ RowType rowType = ParquetSchemaConverter.convertToRowType(messageType);
+
+ assertEquals(DataTypes.BYTES().getLogicalType(), rowType.getTypeAt(1));
+ }
+
@Test
void testVectorFooterMetadataComesFromHoodieSchema() {
HoodieSchema hoodieSchema = HoodieSchema.createRecord(
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index f67be66b3c3e..36349acbe031 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -168,7 +168,10 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
this.writeClient = FlinkWriteClients.createWriteClient(conf,
getRuntimeContext());
this.table = writeClient.getHoodieTable();
- this.schema = HoodieSchemaConverter.convertToSchema(rowType);
+ this.schema = HoodieSchemaConverter.convertToSchema(
+ rowType,
+
HoodieSchemaUtils.getRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME)),
+ conf.get(FlinkOptions.VECTOR_COLUMNS));
// Since there exists discrepancies between flink and spark dealing with
nullability of primary key field,
// and there may be some files written by spark, force update schema as
nullable to make sure clustering
// scan successfully without schema validating exception.
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java
index 1b6cda8c1013..8f02f49ba16b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java
@@ -18,20 +18,41 @@
package org.apache.hudi.table;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringOperator;
+import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestTableEnvs;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.Test;
@@ -42,9 +63,13 @@ import org.junit.jupiter.params.provider.EnumSource;
import java.lang.reflect.Array;
import java.nio.file.Path;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -200,6 +225,30 @@ public class ITTestVectorDataSource {
assertEquals("new3", rows.get(2).getField(2));
}
+ @Test
+ public void testVectorCopyOnWriteClustering() throws Exception {
+ TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+ String tablePath = tempDir.resolve("clustering").toUri().toString();
+ String vectorColsConf = "embedding:2,features:2,codes:2";
+ createVectorTable(tableEnv, "vector_table", tablePath,
HoodieTableType.COPY_ON_WRITE, vectorColsConf, "insert");
+
+ execInsertSql(tableEnv,
+ "INSERT INTO vector_table(id, embedding, features, codes, label, ts)
VALUES "
+ + "('id1', ARRAY[CAST(1.0 AS FLOAT), CAST(1.1 AS FLOAT)],
ARRAY[CAST(10.0 AS DOUBLE), CAST(10.1 AS DOUBLE)], "
+ + " ARRAY[CAST(1 AS TINYINT), CAST(2 AS TINYINT)], 'cluster1', 1),
"
+ + "('id2', ARRAY[CAST(2.0 AS FLOAT), CAST(2.2 AS FLOAT)],
ARRAY[CAST(20.0 AS DOUBLE), CAST(20.2 AS DOUBLE)], "
+ + " ARRAY[CAST(3 AS TINYINT), CAST(4 AS TINYINT)], 'cluster2', 2),
"
+ + "('id3', ARRAY[CAST(3.0 AS FLOAT), CAST(3.3 AS FLOAT)],
ARRAY[CAST(30.0 AS DOUBLE), CAST(30.3 AS DOUBLE)], "
+ + " ARRAY[CAST(5 AS TINYINT), CAST(6 AS TINYINT)], 'cluster3',
3)");
+
+ runClustering(tablePath, vectorColsConf);
+
+ assertStoredVectorSchema(tablePath, "embedding", 2,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ assertStoredVectorSchema(tablePath, "features", 2,
HoodieSchema.Vector.VectorElementType.DOUBLE);
+ assertStoredVectorSchema(tablePath, "codes", 2,
HoodieSchema.Vector.VectorElementType.INT8);
+ assertClusteredVectorRows(tableEnv);
+ }
+
@Test
public void testVectorDimensionMismatchFails() {
TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
@@ -229,6 +278,32 @@ public class ITTestVectorDataSource {
HoodieTableType tableType,
String vectorColumns,
String writeOperation) {
+ createVectorTable(tableEnv, tableName, tablePath, tableType,
vectorColumns, writeOperation, Collections.emptyMap());
+ }
+
+ private static void createVectorTable(
+ TableEnvironment tableEnv,
+ String tableName,
+ String tablePath,
+ HoodieTableType tableType,
+ String vectorColumns,
+ String writeOperation,
+ Map<String, String> extraOptions) {
+ Map<String, String> options = new LinkedHashMap<>();
+ options.put("connector", "hudi");
+ options.put("path", tablePath);
+ options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name());
+ options.put(FlinkOptions.ORDERING_FIELDS.key(), "ts");
+ options.put(FlinkOptions.VECTOR_COLUMNS.key(), vectorColumns);
+ options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
+ options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
+ if (writeOperation != null) {
+ options.put(FlinkOptions.OPERATION.key(), writeOperation);
+ }
+ options.put(FlinkOptions.WRITE_TASKS.key(), "1");
+ options.put(FlinkOptions.READ_TASKS.key(), "1");
+ options.putAll(extraOptions);
+
tableEnv.executeSql(
"CREATE TABLE " + tableName + " ("
+ " id STRING,"
@@ -241,20 +316,126 @@ public class ITTestVectorDataSource {
+ " ts BIGINT,"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") WITH ("
- + " 'connector' = 'hudi',"
- + " 'path' = '" + tablePath + "',"
- + " '" + FlinkOptions.TABLE_TYPE.key() + "' = '" +
tableType.name() + "',"
- + " '" + FlinkOptions.ORDERING_FIELDS.key() + "' = 'ts',"
- + " '" + FlinkOptions.VECTOR_COLUMNS.key() + "' = '" +
vectorColumns + "',"
- + " '" + FlinkOptions.METADATA_ENABLED.key() + "' = 'false',"
- + " '" + FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key() + "' =
'false',"
- + " '" + FlinkOptions.COMPACTION_ASYNC_ENABLED.key() + "' =
'false',"
- + (writeOperation == null ? "" : " '" +
FlinkOptions.OPERATION.key() + "' = '" + writeOperation + "',")
- + " '" + FlinkOptions.WRITE_TASKS.key() + "' = '1',"
- + " '" + FlinkOptions.READ_TASKS.key() + "' = '1'"
+ + formatOptions(options)
+ ")");
}
+ private static void runClustering(String tablePath, String vectorColsConf)
throws Exception {
+ FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+ cfg.path = tablePath;
+ cfg.targetPartitions = 1;
+ cfg.sortMemory = 128;
+ Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ conf.set(FlinkOptions.TABLE_NAME,
metaClient.getTableConfig().getTableName());
+ conf.set(FlinkOptions.RECORD_KEY_FIELD,
metaClient.getTableConfig().getRecordKeyFieldProp());
+ conf.set(FlinkOptions.VECTOR_COLUMNS, vectorColsConf);
+ CompactionUtil.setPartitionField(conf, metaClient);
+ CompactionUtil.setAvroSchema(conf, metaClient);
+
+ try (HoodieFlinkWriteClient<?> writeClient =
FlinkWriteClients.createWriteClient(conf)) {
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+ Option<String> clusteringInstantTime =
writeClient.scheduleClustering(Option.empty());
+ assertTrue(clusteringInstantTime.isPresent(), "The clustering plan
should be scheduled");
+
+ table.getMetaClient().reloadActiveTimeline();
+ HoodieTimeline timeline =
table.getActiveTimeline().filterPendingClusteringTimeline()
+ .filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED);
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption =
+ ClusteringUtils.getClusteringPlan(table.getMetaClient(),
timeline.lastInstant().get());
+ HoodieClusteringPlan clusteringPlan =
clusteringPlanOption.get().getRight();
+ assertTrue(clusteringPlan.getInputGroups().size() > 0, "The clustering
plan should contain input groups");
+
+ HoodieInstant instant =
INSTANT_GENERATOR.getClusteringCommitRequestedInstant(clusteringInstantTime.get());
+ table.getActiveTimeline().transitionClusterRequestedToInflight(instant,
Option.empty());
+
+ HoodieSchema tableSchema =
StreamerUtil.getTableSchema(table.getMetaClient(), false);
+ DataType rowDataType =
HoodieSchemaConverter.convertToDataType(tableSchema);
+ RowType rowType = (RowType) rowDataType.getLogicalType();
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<ClusteringCommitEvent> dataStream = env.addSource(new
ClusteringPlanSourceFunction(clusteringInstantTime.get(), clusteringPlan, conf))
+ .name("clustering_source")
+ .uid("uid_vector_clustering_source")
+ .rebalance()
+ .transform("clustering_task",
+ TypeInformation.of(ClusteringCommitEvent.class),
+ new ClusteringOperator(conf, rowType))
+ .setParallelism(clusteringPlan.getInputGroups().size());
+
+ ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+ conf.get(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+
+ dataStream
+ .addSink(new ClusteringCommitSink(conf))
+ .name("clustering_commit")
+ .uid("uid_vector_clustering_commit")
+ .setParallelism(1);
+
+ env.execute("flink_hudi_vector_clustering");
+
assertTrue(table.getMetaClient().reloadActiveTimeline().filterCompletedInstants().containsInstant(instant.requestedTime()));
+ }
+ }
+
+ private static String formatOptions(Map<String, String> options) {
+ StringBuilder builder = new StringBuilder();
+ for (Map.Entry<String, String> entry : options.entrySet()) {
+ if (builder.length() > 0) {
+ builder.append(",");
+ }
+ builder.append(" '").append(entry.getKey()).append("' =
'").append(entry.getValue()).append("'");
+ }
+ return builder.toString();
+ }
+
+ private static void assertCompactedVectorRows(TableEnvironment tableEnv) {
+ List<Row> rows = collect(tableEnv, "SELECT id, embedding, features, codes,
label, ts FROM vector_table ORDER BY id");
+ assertEquals(3, rows.size());
+
+ assertEquals("id1", rows.get(0).getField(0));
+ assertFloatArray(rows.get(0).getField(1), new float[] {9.0f, 9.9f});
+ assertDoubleArray(rows.get(0).getField(2), new double[] {90.0d, 90.9d});
+ assertByteArray(rows.get(0).getField(3), new byte[] {9, 8});
+ assertEquals("new1", rows.get(0).getField(4));
+ assertEquals(10L, rows.get(0).getField(5));
+
+ assertEquals("id2", rows.get(1).getField(0));
+ assertFloatArray(rows.get(1).getField(1), new float[] {2.0f, 2.2f});
+ assertDoubleArray(rows.get(1).getField(2), new double[] {20.0d, 20.2d});
+ assertByteArray(rows.get(1).getField(3), new byte[] {3, 4});
+ assertEquals("old2", rows.get(1).getField(4));
+
+ assertEquals("id3", rows.get(2).getField(0));
+ assertFloatArray(rows.get(2).getField(1), new float[] {3.0f, 3.3f});
+ assertDoubleArray(rows.get(2).getField(2), new double[] {30.0d, 30.3d});
+ assertByteArray(rows.get(2).getField(3), new byte[] {5, 6});
+ assertEquals("new3", rows.get(2).getField(4));
+ }
+
+ private static void assertClusteredVectorRows(TableEnvironment tableEnv) {
+ List<Row> rows = collect(tableEnv, "SELECT id, embedding, features, codes,
label, ts FROM vector_table ORDER BY id");
+ assertEquals(3, rows.size());
+
+ assertEquals("id1", rows.get(0).getField(0));
+ assertFloatArray(rows.get(0).getField(1), new float[] {1.0f, 1.1f});
+ assertDoubleArray(rows.get(0).getField(2), new double[] {10.0d, 10.1d});
+ assertByteArray(rows.get(0).getField(3), new byte[] {1, 2});
+ assertEquals("cluster1", rows.get(0).getField(4));
+
+ assertEquals("id2", rows.get(1).getField(0));
+ assertFloatArray(rows.get(1).getField(1), new float[] {2.0f, 2.2f});
+ assertDoubleArray(rows.get(1).getField(2), new double[] {20.0d, 20.2d});
+ assertByteArray(rows.get(1).getField(3), new byte[] {3, 4});
+ assertEquals("cluster2", rows.get(1).getField(4));
+
+ assertEquals("id3", rows.get(2).getField(0));
+ assertFloatArray(rows.get(2).getField(1), new float[] {3.0f, 3.3f});
+ assertDoubleArray(rows.get(2).getField(2), new double[] {30.0d, 30.3d});
+ assertByteArray(rows.get(2).getField(3), new byte[] {5, 6});
+ assertEquals("cluster3", rows.get(2).getField(4));
+ }
+
private static void execInsertSql(TableEnvironment tableEnv, String insert)
throws ExecutionException, InterruptedException {
TableResult tableResult = tableEnv.executeSql(insert);
tableResult.await();