This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7af4fb9acfae feat(flink): Support writing VECTOR columns for flink
writer (#18877)
7af4fb9acfae is described below
commit 7af4fb9acfae9572fe2bcd221e9799e525d915d1
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon Jun 22 11:06:13 2026 +0800
feat(flink): Support writing VECTOR columns for flink writer (#18877)
* feat(flink): Support writing VECTOR columns for flink writer
---------
Co-authored-by: danny0405 <[email protected]>
---
.../io/storage/row/HoodieRowDataCreateHandle.java | 14 +-
.../row/HoodieRowDataParquetWriteSupport.java | 6 +
.../io/storage/row/RowDataParquetWriteSupport.java | 16 +-
.../storage/row/parquet/ParquetRowDataWriter.java | 57 +++-
.../row/parquet/ParquetSchemaConverter.java | 60 +++-
.../apache/hudi/util/HoodieSchemaConverter.java | 70 ++++-
.../apache/hudi/util/RowDataToAvroConverters.java | 5 +-
.../org/apache/hudi/util/VectorColumnParser.java | 130 ++++++++
.../apache/hudi/util/VectorConversionUtils.java | 35 +++
.../storage/row/TestHoodieRowDataCreateHandle.java | 68 ++---
.../TestHoodieRowDataParquetConfigInjector.java | 50 ++++
.../row/parquet/TestParquetSchemaConverter.java | 85 +++++-
.../hudi/util/TestHoodieSchemaConverter.java | 57 ++++
.../hudi/common/schema/HoodieSchemaUtils.java | 13 +
.../apache/hudi/configuration/FlinkOptions.java | 8 +
.../sink/bucket/BucketBulkInsertWriterHelper.java | 2 +-
.../hudi/sink/bulk/BulkInsertWriterHelper.java | 21 +-
.../org/apache/hudi/table/HoodieTableFactory.java | 5 +-
.../org/apache/hudi/table/HoodieTableSource.java | 5 +-
.../apache/hudi/table/catalog/HoodieCatalog.java | 13 +-
.../hudi/table/catalog/HoodieHiveCatalog.java | 7 +-
.../hudi/table/catalog/TableOptionProperties.java | 5 +-
.../apache/hudi/table/ITTestVectorDataSource.java | 332 +++++++++++++++++++++
.../hudi/utils/TestRowDataToAvroConverters.java | 67 ++++-
24 files changed, 1024 insertions(+), 107 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index e3a8445d6675..35953870957d 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -79,6 +80,8 @@ public class HoodieRowDataCreateHandle implements
Serializable {
private final String fileId;
private final boolean preserveHoodieMetadata;
private final boolean skipMetadataWrite;
+ // The schema (with meta fields appended when required) used to create the
file writer.
+ private final HoodieSchema writerSchema;
private final HoodieStorage storage;
protected final WriteStatus writeStatus;
private final HoodieRecordLocation newRecordLocation;
@@ -90,7 +93,7 @@ public class HoodieRowDataCreateHandle implements
Serializable {
public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig
writeConfig, String partitionPath, String fileId,
String instantTime, int taskPartitionId,
long taskId, long taskEpochId,
- RowType rowType, boolean
preserveHoodieMetadata, boolean skipMetadataWrite) {
+ HoodieSchema schema, boolean
preserveHoodieMetadata, boolean skipMetadataWrite) {
this.partitionPath = partitionPath;
this.table = table;
this.writeConfig = writeConfig;
@@ -102,10 +105,11 @@ public class HoodieRowDataCreateHandle implements
Serializable {
this.newRecordLocation = new HoodieRecordLocation(instantTime, fileId);
this.preserveHoodieMetadata = preserveHoodieMetadata;
this.skipMetadataWrite = skipMetadataWrite;
+ this.writerSchema = schema;
this.currTimer = HoodieTimer.start();
this.storage = table.getStorage();
this.path = makeNewPath(partitionPath);
- this.eventTimeFieldGetter = initEventTimeFieldGetter(writeConfig, rowType);
+ this.eventTimeFieldGetter = initEventTimeFieldGetter(writeConfig,
HoodieSchemaConverter.convertToRowType(writerSchema));
this.writeStatus = new WriteStatus(table.shouldTrackSuccessRecords(),
writeConfig.getWriteStatusFailureFraction());
@@ -122,7 +126,7 @@ public class HoodieRowDataCreateHandle implements
Serializable {
table.getPartitionMetafileFormat());
partitionMetadata.trySave();
createMarkerFile(partitionPath,
FSUtils.makeBaseFileName(this.instantTime, getWriteToken(), this.fileId,
table.getBaseFileExtension()));
- this.fileWriter = createNewFileWriter(path, table, writeConfig, rowType,
this.instantTime);
+ this.fileWriter = createNewFileWriter(path, table, writeConfig,
this.instantTime);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize file writer for
path " + path, e);
}
@@ -292,7 +296,7 @@ public class HoodieRowDataCreateHandle implements
Serializable {
}
protected HoodieRowDataFileWriter createNewFileWriter(
- Path path, HoodieTable hoodieTable, HoodieWriteConfig config, RowType
rowType, String instantTime)
+ Path path, HoodieTable hoodieTable, HoodieWriteConfig config, String
instantTime)
throws IOException {
StoragePath storagePath = new StoragePath(path.toUri());
return (HoodieRowDataFileWriter) HoodieFileWriterFactory.getFileWriter(
@@ -300,7 +304,7 @@ public class HoodieRowDataCreateHandle implements
Serializable {
storagePath,
hoodieTable.getStorage(),
config,
- HoodieSchemaConverter.convertToSchema(rowType).getNonNullType(),
+ writerSchema,
hoodieTable.getTaskContextSupplier(),
HoodieRecord.HoodieRecordType.FLINK);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
index 14ed278b70a2..b3c70c0c6186 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.api.WriteSupport;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -52,6 +53,11 @@ public class HoodieRowDataParquetWriteSupport extends
RowDataParquetWriteSupport
Map<String, String> extraMetadata =
bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata)
.orElse(Collections.emptyMap());
+ String vectorColumnsMetadata =
HoodieSchema.buildVectorColumnsMetadataValue(schema);
+ if (!vectorColumnsMetadata.isEmpty()) {
+ extraMetadata = new HashMap<>(extraMetadata);
+ extraMetadata.put(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY,
vectorColumnsMetadata);
+ }
return new WriteSupport.FinalizedWriteContext(extraMetadata);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java
index 01d2806e7633..5043a6b9af66 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java
@@ -22,10 +22,8 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.io.storage.row.parquet.ParquetRowDataWriter;
import org.apache.hudi.io.storage.row.parquet.ParquetSchemaConverter;
-import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.RecordConsumer;
@@ -38,21 +36,21 @@ import java.util.HashMap;
*/
public class RowDataParquetWriteSupport extends WriteSupport<RowData> {
- private final RowType rowType;
- private final MessageType schema;
+ protected final HoodieSchema schema;
+ private final MessageType fileSchema;
private ParquetRowDataWriter writer;
protected final Configuration hadoopConf;
- public RowDataParquetWriteSupport(HoodieSchema hoodieSchema, Configuration
config) {
+ public RowDataParquetWriteSupport(HoodieSchema schema, Configuration config)
{
super();
- this.rowType = HoodieSchemaConverter.convertToRowType(hoodieSchema);
+ this.schema = schema;
this.hadoopConf = new Configuration(config);
- this.schema =
ParquetSchemaConverter.convertToParquetMessageType("flink_schema", rowType);
+ this.fileSchema =
ParquetSchemaConverter.convertToParquetMessageType("flink_schema", schema);
}
@Override
public WriteContext init(Configuration configuration) {
- return new WriteContext(schema, new HashMap<>());
+ return new WriteContext(fileSchema, new HashMap<>());
}
@Override
@@ -62,7 +60,7 @@ public class RowDataParquetWriteSupport extends
WriteSupport<RowData> {
hadoopConf.getBoolean(
HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(),
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue());
- this.writer = new ParquetRowDataWriter(recordConsumer, rowType, schema,
utcTimestamp);
+ this.writer = new ParquetRowDataWriter(recordConsumer, utcTimestamp,
schema);
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
index b3b612506b95..4a3c13064685 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
@@ -18,7 +18,12 @@
package org.apache.hudi.io.storage.row.parquet;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.VectorConversionUtils;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalDataUtils;
@@ -35,7 +40,6 @@ import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.util.Preconditions;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
-import org.apache.parquet.schema.GroupType;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -64,16 +68,17 @@ public class ParquetRowDataWriter {
public ParquetRowDataWriter(
RecordConsumer recordConsumer,
- RowType rowType,
- GroupType schema,
- boolean utcTimestamp) {
+ boolean utcTimestamp,
+ HoodieSchema schema) {
this.recordConsumer = recordConsumer;
this.utcTimestamp = utcTimestamp;
+ RowType rowType = HoodieSchemaConverter.convertToRowType(schema);
this.filedWriters = new FieldWriter[rowType.getFieldCount()];
this.fieldNames = rowType.getFieldNames().toArray(new String[0]);
for (int i = 0; i < rowType.getFieldCount(); i++) {
- this.filedWriters[i] = createWriter(rowType.getTypeAt(i));
+ HoodieSchema fieldSchema = HoodieSchemaUtils.getFieldSchema(schema,
fieldNames[i]);
+ this.filedWriters[i] = createWriter(rowType.getTypeAt(i), fieldSchema);
}
}
@@ -97,7 +102,9 @@ public class ParquetRowDataWriter {
recordConsumer.endMessage();
}
- private FieldWriter createWriter(LogicalType t) {
+ private FieldWriter createWriter(LogicalType t, HoodieSchema oriFieldSchema)
{
+ // strip the nullable wrapper so the element/key/value/record sub-schemas
can be resolved directly
+ HoodieSchema fieldSchema = oriFieldSchema.getNonNullType();
switch (t.getTypeRoot()) {
case CHAR:
case VARCHAR:
@@ -141,21 +148,27 @@ public class ParquetRowDataWriter {
return new Timestamp96Writer(tsLtzPrecision);
}
case ARRAY:
+ if (fieldSchema.getType() == HoodieSchemaType.VECTOR) {
+ return new VectorWriter((HoodieSchema.Vector) fieldSchema);
+ }
ArrayType arrayType = (ArrayType) t;
LogicalType elementType = arrayType.getElementType();
- FieldWriter elementWriter = createWriter(elementType);
+ FieldWriter elementWriter = createWriter(elementType,
fieldSchema.getElementType());
return new ArrayWriter(elementWriter);
case MAP:
MapType mapType = (MapType) t;
LogicalType keyType = mapType.getKeyType();
LogicalType valueType = mapType.getValueType();
- FieldWriter keyWriter = createWriter(keyType);
- FieldWriter valueWriter = createWriter(valueType);
+ FieldWriter keyWriter = createWriter(keyType,
fieldSchema.getKeyType());
+ FieldWriter valueWriter = createWriter(valueType,
fieldSchema.getValueType());
return new MapWriter(keyWriter, valueWriter);
case ROW:
RowType rowType = (RowType) t;
+ ValidationUtils.checkArgument(fieldSchema.getType() ==
HoodieSchemaType.RECORD || fieldSchema.getType() == HoodieSchemaType.BLOB,
+ "Hoodie schema should be RECORD or BLOB type.");
FieldWriter[] fieldWriters = rowType.getFields().stream()
-
.map(RowType.RowField::getType).map(this::createWriter).toArray(FieldWriter[]::new);
+ .map(field -> createWriter(field.getType(),
HoodieSchemaUtils.getFieldSchema(fieldSchema, field.getName())))
+ .toArray(FieldWriter[]::new);
String[] fieldNames = rowType.getFields().stream()
.map(RowType.RowField::getName).toArray(String[]::new);
return new RowWriter(fieldNames, fieldWriters);
@@ -274,6 +287,29 @@ public class ParquetRowDataWriter {
}
}
+ private class VectorWriter implements FieldWriter {
+ private final HoodieSchema.Vector vectorSchema;
+
+ private VectorWriter(HoodieSchema.Vector vectorSchema) {
+ this.vectorSchema = vectorSchema;
+ }
+
+ @Override
+ public void write(RowData row, int ordinal) {
+ writeVector(row.getArray(ordinal));
+ }
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ writeVector(array.getArray(ordinal));
+ }
+
+ private void writeVector(ArrayData vectorArray) {
+ recordConsumer.addBinary(
+
Binary.fromReusedByteArray(VectorConversionUtils.encodeVectorArrayData(vectorArray,
vectorSchema)));
+ }
+ }
+
private class IntWriter implements FieldWriter {
@Override
@@ -619,4 +655,3 @@ public class ParquetRowDataWriter {
}
}
}
-
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index dd19100f8ccd..5bad03d9253e 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -20,7 +20,10 @@ package org.apache.hudi.io.storage.row.parquet;
import org.apache.hudi.adapter.DataTypeAdapter;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.util.HoodieSchemaConverter;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.table.api.DataTypes;
@@ -205,12 +208,32 @@ public class ParquetSchemaConverter {
return Pair.of(keyValue.getType(MAP_KEY_NAME),
keyValue.getType(MAP_VALUE_NAME));
}
+ /**
+ * Converts a Flink row type to a Parquet message type.
+ *
+ * <p>This overload preserves the pre-VECTOR public API, but converting from
+ * {@link RowType} to {@link HoodieSchema} loses VECTOR logical metadata
such as dimension and
+ * element type. Prefer {@link #convertToParquetMessageType(String,
HoodieSchema)} whenever the
+ * caller already has a metadata-aware {@link HoodieSchema}.
+ */
public static MessageType convertToParquetMessageType(String name, RowType
rowType) {
+ HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(rowType);
+ return convertToParquetMessageType(name, hoodieSchema);
+ }
+
+ public static MessageType convertToParquetMessageType(String name,
HoodieSchema oriRowSchema) {
+ HoodieSchema rowSchema = oriRowSchema.getNonNullType();
+ RowType rowType = HoodieSchemaConverter.convertToRowType(rowSchema);
Type[] types = new Type[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); i++) {
String fieldName = rowType.getFieldNames().get(i);
LogicalType fieldType = rowType.getTypeAt(i);
- types[i] = convertToParquetType(fieldName, fieldType,
fieldType.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED);
+ HoodieSchema fieldSchema = HoodieSchemaUtils.getFieldSchema(rowSchema,
fieldName);
+ types[i] = convertToParquetType(
+ fieldName,
+ fieldType,
+ fieldType.isNullable() ? Type.Repetition.OPTIONAL :
Type.Repetition.REQUIRED,
+ fieldSchema);
}
return new MessageType(name, types);
}
@@ -264,7 +287,8 @@ public class ParquetSchemaConverter {
}
private static Type convertToParquetType(
- String name, LogicalType type, Type.Repetition repetition) {
+ String name, LogicalType type, Type.Repetition repetition, HoodieSchema
oriFieldSchema) {
+ HoodieSchema fieldSchema = oriFieldSchema.getNonNullType();
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
@@ -338,6 +362,15 @@ public class ParquetSchemaConverter {
.named(name);
}
case ARRAY:
+ if (fieldSchema.getType() == HoodieSchemaType.VECTOR) {
+ HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) fieldSchema;
+ int fixedSize = Math.multiplyExact(
+ vectorSchema.getDimension(),
+ vectorSchema.getVectorElementType().getElementSize());
+ return
Types.primitive(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
repetition)
+ .length(fixedSize)
+ .named(name);
+ }
// align with Spark And Avro regarding the standard mode array type,
see:
//
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
//
@@ -350,7 +383,13 @@ public class ParquetSchemaConverter {
Type.Repetition eleRepetition =
arrayType.getElementType().isNullable() ? Type.Repetition.OPTIONAL
: Type.Repetition.REQUIRED;
return ConversionPatterns.listOfElements(
- repetition, name, convertToParquetType("element",
arrayType.getElementType(), eleRepetition));
+ repetition,
+ name,
+ convertToParquetType(
+ "element",
+ arrayType.getElementType(),
+ eleRepetition,
+ fieldSchema.getElementType()));
case MAP:
// <map-repetition> group <name> (MAP) {
// repeated group key_value {
@@ -366,15 +405,24 @@ public class ParquetSchemaConverter {
.addField(
Types
.repeatedGroup()
- .addField(convertToParquetType("key", keyType,
Type.Repetition.REQUIRED))
- .addField(convertToParquetType("value", valueType,
valueType.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED))
+ .addField(convertToParquetType(
+ "key", keyType, Type.Repetition.REQUIRED,
fieldSchema.getKeyType()))
+ .addField(convertToParquetType(
+ "value",
+ valueType,
+ valueType.isNullable() ? Type.Repetition.OPTIONAL :
Type.Repetition.REQUIRED,
+ fieldSchema.getValueType()))
.named("key_value"))
.named(name);
case ROW:
RowType rowType = (RowType) type;
Types.GroupBuilder<GroupType> builder = Types.buildGroup(repetition);
rowType.getFields().forEach(field -> builder
- .addField(convertToParquetType(field.getName(), field.getType(),
field.getType().isNullable() ? Type.Repetition.OPTIONAL :
Type.Repetition.REQUIRED)));
+ .addField(convertToParquetType(
+ field.getName(),
+ field.getType(),
+ field.getType().isNullable() ? Type.Repetition.OPTIONAL :
Type.Repetition.REQUIRED,
+ HoodieSchemaUtils.getFieldSchema(fieldSchema,
field.getName()))));
return builder.named(name);
default:
if (DataTypeAdapter.isVariantType(type)) {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
index c859675488ad..af992d1a18f4 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
@@ -44,7 +45,9 @@ import org.apache.flink.table.types.logical.TimestampType;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -79,6 +82,38 @@ public class HoodieSchemaConverter {
* @return HoodieSchema matching this logical type
*/
public static HoodieSchema convertToSchema(LogicalType logicalType, String
rowName) {
+ return convertToSchema(logicalType, rowName, Collections.emptyMap());
+ }
+
+ /**
+ * Converts a Flink LogicalType into a HoodieSchema with top-level VECTOR
columns.
+ *
+ * <p>The vector column option uses {@code colName[:dimension]} entries
separated by commas.
+ * The dimension defaults to 128. The vector element type is inferred from
the Flink array
+ * element type: FLOAT, DOUBLE, or TINYINT.
+ *
+ * @param logicalType Flink logical type
+ * @param rowName the record name
+ * @param vectorColumns comma-separated vector column descriptors, or
null/empty
+ * @return HoodieSchema matching this logical type
+ */
+ public static HoodieSchema convertToSchema(
+ LogicalType logicalType,
+ String rowName,
+ String vectorColumns) {
+ Map<String, Integer> vectorColumnMap = vectorColumns == null ||
vectorColumns.trim().isEmpty()
+ ? Collections.emptyMap() : VectorColumnParser.parse(vectorColumns);
+ validateVectorColumns(logicalType, vectorColumnMap);
+ return convertToSchema(logicalType, rowName, vectorColumnMap);
+ }
+
+ private static HoodieSchema convertToSchema(
+ LogicalType logicalType,
+ String rowName,
+ Map<String, Integer> vectorColumns) {
+ ValidationUtils.checkArgument(vectorColumns.isEmpty() || logicalType
instanceof RowType,
+ "VECTOR columns can only be configured for top-level ROW schemas.");
+
int precision;
boolean nullable = logicalType.isNullable();
HoodieSchema schema;
@@ -196,8 +231,13 @@ public class HoodieSchemaConverter {
String fieldName = fieldNames.get(i);
LogicalType fieldType = rowType.getTypeAt(i);
- // Recursive call for field schema
- HoodieSchema fieldSchema = convertToSchema(fieldType, rowName + "."
+ fieldName);
+ HoodieSchema fieldSchema;
+ if (vectorColumns.containsKey(fieldName)) {
+ fieldSchema = VectorColumnParser.convertVectorField(fieldName,
fieldType, vectorColumns.get(fieldName));
+ } else {
+ // Recursive call for field schema
+ fieldSchema = convertToSchema(fieldType, rowName + "." +
fieldName, Collections.emptyMap());
+ }
// Create field with or without default value
HoodieSchemaField field;
@@ -215,13 +255,13 @@ public class HoodieSchemaConverter {
case MULTISET:
case MAP:
LogicalType valueType = extractValueTypeForMap(logicalType);
- HoodieSchema valueSchema = convertToSchema(valueType, rowName);
+ HoodieSchema valueSchema = convertToSchema(valueType, rowName,
Collections.emptyMap());
schema = HoodieSchema.createMap(valueSchema);
break;
case ARRAY:
ArrayType arrayType = (ArrayType) logicalType;
- HoodieSchema elementSchema =
convertToSchema(arrayType.getElementType(), rowName);
+ HoodieSchema elementSchema =
convertToSchema(arrayType.getElementType(), rowName, Collections.emptyMap());
schema = HoodieSchema.createArray(elementSchema);
break;
@@ -238,6 +278,28 @@ public class HoodieSchemaConverter {
return nullable ? HoodieSchema.createNullable(schema) : schema;
}
+ /**
+ * Validates that all configured VECTOR columns resolve to top-level fields
of the row schema.
+ *
+ * <p>Invoked by {@link #convertToSchema(LogicalType, String, String)}
before schema inference so
+ * an unknown column is rejected instead of being silently ignored during
conversion.
+ *
+ * @param logicalType Flink logical type
+ * @param vectorColumns parsed vector columns (normalized column name to
dimension), may be empty
+ */
+ private static void validateVectorColumns(LogicalType logicalType,
Map<String, Integer> vectorColumns) {
+ if (vectorColumns.isEmpty()) {
+ return;
+ }
+ List<String> normalizedFieldNames = ((RowType)
logicalType).getFieldNames();
+ vectorColumns.keySet().stream()
+ .filter(vectorColumn -> !normalizedFieldNames.contains(vectorColumn))
+ .findFirst()
+ .ifPresent(vectorColumn -> {
+ throw new IllegalArgumentException("VECTOR column '" + vectorColumn
+ "' does not exist in the table schema.");
+ });
+ }
+
/**
* Extracts value type for map conversion.
* Maps must have string keys for Avro/HoodieSchema compatibility.
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
index 050ad483de10..fcd435532ede 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
@@ -331,6 +331,10 @@ public class RowDataToAvroConverters {
@Override
public Object convert(HoodieSchema schema, Object object) {
+ if (schema.getType() == HoodieSchemaType.VECTOR) {
+ HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) schema;
+ return new GenericData.Fixed(schema.toAvroSchema(),
VectorConversionUtils.encodeVectorArrayData((ArrayData) object, vectorSchema));
+ }
final HoodieSchema elementSchema = schema.getElementType();
ArrayData arrayData = (ArrayData) object;
List<Object> list = new ArrayList<>();
@@ -394,4 +398,3 @@ public class RowDataToAvroConverters {
};
}
}
-
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorColumnParser.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorColumnParser.java
new file mode 100644
index 000000000000..68724be54cb6
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorColumnParser.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.LinkedHashMap;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * Parses the {@code hoodie.vector.columns} table option and converts the
configured top-level
+ * Flink array fields into Hoodie VECTOR schema fields.
+ *
+ * <p>The option uses {@code colName[:dimension]} entries separated by commas,
for example
+ * {@code embedding:4,features:3,codes:4}. The dimension defaults to {@value
#DEFAULT_VECTOR_DIMENSION}
+ * when omitted. Column names are matched case-insensitively. The vector
element type is inferred
+ * from the Flink array element type: {@code ARRAY<FLOAT>}, {@code
ARRAY<DOUBLE>}, or
+ * {@code ARRAY<TINYINT>} map to FLOAT, DOUBLE, and INT8 respectively.
+ *
+ * <p>The parser validates the descriptor syntax (well-formed entries, no
duplicate columns,
+ * positive dimensions); existence of the referenced columns in the table
schema is validated by
+ * {@link HoodieSchemaConverter} during schema conversion.
+ */
+public class VectorColumnParser {
+
+ public static final int DEFAULT_VECTOR_DIMENSION = 128;
+
+ private VectorColumnParser() {
+ }
+
+ /**
+ * Parses the {@code hoodie.vector.columns} descriptor string into a map
from the
+ * (lower-cased) column name to its vector dimension, preserving declaration
order.
+ *
+ * @param vectorColumns comma-separated {@code colName[:dimension]}
descriptors
+ * @return map from normalized column name to dimension
+ * @throws IllegalArgumentException if a descriptor is malformed,
duplicated, or has a non-positive dimension
+ */
+ public static Map<String, Integer> parse(String vectorColumns) {
+ Map<String, Integer> parsed = new LinkedHashMap<>();
+ for (String rawEntry : vectorColumns.split(",")) {
+ String entry = rawEntry.trim();
+ if (entry.isEmpty()) {
+ continue;
+ }
+ String[] parts = entry.split(":", -1);
+ if (parts.length > 2 || parts[0].trim().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Invalid VECTOR column descriptor '" + entry + "'. Expected
format: columnName[:dimension].");
+ }
+ String columnName = parts[0].trim();
+ String normalizedColumnName = columnName.toLowerCase(Locale.ROOT);
+ if (parsed.containsKey(normalizedColumnName)) {
+ throw new IllegalArgumentException("Duplicate VECTOR column descriptor
for column: " + columnName);
+ }
+ int dimension = DEFAULT_VECTOR_DIMENSION;
+ if (parts.length == 2) {
+ String dimensionText = parts[1].trim();
+ if (dimensionText.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Invalid VECTOR column descriptor '" + entry + "'. Dimension
must not be empty.");
+ }
+ try {
+ dimension = Integer.parseInt(dimensionText);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid VECTOR dimension for
column '" + columnName + "': " + dimensionText, e);
+ }
+ }
+ if (dimension <= 0) {
+ throw new IllegalArgumentException("VECTOR dimension must be positive
for column '" + columnName + "': " + dimension);
+ }
+ parsed.put(normalizedColumnName, dimension);
+ }
+ return parsed;
+ }
+
+ /**
+ * Converts a Flink array field into a Hoodie VECTOR schema field with the
given dimension.
+ *
+ * @param fieldName the column name (for error messages)
+ * @param fieldType the Flink field type, which must be an {@link ArrayType}
+ * @param dimension the configured vector dimension
+ * @return the corresponding VECTOR {@link HoodieSchema}, wrapped as
nullable when the field is nullable
+ */
+ public static HoodieSchema convertVectorField(String fieldName, LogicalType
fieldType, int dimension) {
+ if (!(fieldType instanceof ArrayType)) {
+ throw new IllegalArgumentException(
+ "VECTOR column '" + fieldName + "' must be declared as ARRAY<FLOAT>,
ARRAY<DOUBLE>, or ARRAY<TINYINT>, but got: "
+ + fieldType.asSummaryString());
+ }
+ HoodieSchema.Vector.VectorElementType elementType =
inferVectorElementType(fieldName, ((ArrayType) fieldType).getElementType());
+ HoodieSchema vectorSchema = HoodieSchema.createVector(dimension,
elementType);
+ return fieldType.isNullable() ? HoodieSchema.createNullable(vectorSchema)
: vectorSchema;
+ }
+
+ private static HoodieSchema.Vector.VectorElementType
inferVectorElementType(String fieldName, LogicalType elementType) {
+ switch (elementType.getTypeRoot()) {
+ case FLOAT:
+ return HoodieSchema.Vector.VectorElementType.FLOAT;
+ case DOUBLE:
+ return HoodieSchema.Vector.VectorElementType.DOUBLE;
+ case TINYINT:
+ return HoodieSchema.Vector.VectorElementType.INT8;
+ default:
+ throw new IllegalArgumentException(
+ "VECTOR column '" + fieldName + "' must use ARRAY<FLOAT>,
ARRAY<DOUBLE>, or ARRAY<TINYINT>, but got ARRAY<"
+ + elementType.asSummaryString() + ">.");
+ }
+ }
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorConversionUtils.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorConversionUtils.java
index 402d72372931..c3721bae71ea 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorConversionUtils.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorConversionUtils.java
@@ -21,11 +21,13 @@ package org.apache.hudi.util;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.util.HoodieVectorUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -35,6 +37,7 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
@@ -273,6 +276,38 @@ public final class VectorConversionUtils {
throw new UnsupportedOperationException("Unsupported decoded vector array
type: " + vectorArray.getClass());
}
+ /**
+ * Encodes Flink array data into the canonical Hudi VECTOR fixed-bytes
representation.
+ */
+ public static byte[] encodeVectorArrayData(ArrayData arrayData,
HoodieSchema.Vector vectorSchema) {
+ int dimension = vectorSchema.getDimension();
+ HoodieSchema.Vector.VectorElementType elementType =
vectorSchema.getVectorElementType();
+ ValidationUtils.checkArgument(arrayData.size() == dimension,
+ () -> "Vector dimension mismatch: schema expects " + dimension + "
elements but got " + arrayData.size());
+ int bufferSize = Math.multiplyExact(dimension,
elementType.getElementSize());
+ ByteBuffer buffer =
ByteBuffer.allocate(bufferSize).order(HoodieSchema.VectorLogicalType.VECTOR_BYTE_ORDER);
+ switch (elementType) {
+ case FLOAT:
+ for (int i = 0; i < dimension; i++) {
+ buffer.putFloat(arrayData.getFloat(i));
+ }
+ break;
+ case DOUBLE:
+ for (int i = 0; i < dimension; i++) {
+ buffer.putDouble(arrayData.getDouble(i));
+ }
+ break;
+ case INT8:
+ for (int i = 0; i < dimension; i++) {
+ buffer.put(arrayData.getByte(i));
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported VECTOR element
type: " + elementType);
+ }
+ return buffer.array();
+ }
+
/**
* Validates that a Flink ARRAY logical type uses the element type required
by the VECTOR schema.
*
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataCreateHandle.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataCreateHandle.java
index ecd3ab67d4a1..b8d29dfb2c34 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataCreateHandle.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataCreateHandle.java
@@ -20,24 +20,22 @@ package org.apache.hudi.io.storage.row;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodiePayloadProps;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.testutils.HoodieFlinkClientTestHarness;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -69,27 +67,6 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
cleanupResources();
}
- /**
- * Adds Hoodie metadata fields to the row type, matching production behavior
in BulkInsertWriterHelper.
- */
- private static RowType addMetadataFields(RowType rowType, boolean
withOperationField) {
- List<RowType.RowField> mergedFields = new ArrayList<>();
-
- LogicalType metadataFieldType = DataTypes.STRING().getLogicalType();
- mergedFields.add(new
RowType.RowField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, metadataFieldType));
- mergedFields.add(new
RowType.RowField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, metadataFieldType));
- mergedFields.add(new
RowType.RowField(HoodieRecord.RECORD_KEY_METADATA_FIELD, metadataFieldType));
- mergedFields.add(new
RowType.RowField(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
metadataFieldType));
- mergedFields.add(new
RowType.RowField(HoodieRecord.FILENAME_METADATA_FIELD, metadataFieldType));
-
- if (withOperationField) {
- mergedFields.add(new
RowType.RowField(HoodieRecord.OPERATION_METADATA_FIELD, metadataFieldType));
- }
-
- mergedFields.addAll(rowType.getFields());
- return new RowType(mergedFields);
- }
-
@Test
public void testEventTimeFieldIndexWithDoubleType() throws Exception {
// Schema with DOUBLE event_time field
@@ -101,8 +78,6 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
).notNull();
RowType baseRowType = (RowType) dataType.getLogicalType();
- // Add metadata fields like production code does
- RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
Properties props = new Properties();
props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
"event_time");
@@ -110,13 +85,15 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
.withPath(basePath)
.withProperties(props)
.withEmbeddedTimelineServerEnabled(false)
+
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
.build();
HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context,
metaClient);
HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
- TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
false), false, false);
assertNotNull(handle);
// Verify the handle was created successfully with event time configured
@@ -133,7 +110,6 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
).notNull();
RowType baseRowType = (RowType) dataType.getLogicalType();
- RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
Properties props = new Properties();
props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
"event_time");
@@ -141,6 +117,7 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
.withPath(basePath)
.withProperties(props)
.withEmbeddedTimelineServerEnabled(false)
+
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
.build();
HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context,
metaClient);
@@ -148,7 +125,8 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
// Should create handle but log warning about unsupported type
HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
- TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
false), false, false);
assertNotNull(handle);
}
@@ -163,7 +141,6 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
).notNull();
RowType baseRowType = (RowType) dataType.getLogicalType();
- RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
Properties props = new Properties();
props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
"event_time");
@@ -171,6 +148,7 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
.withPath(basePath)
.withProperties(props)
.withEmbeddedTimelineServerEnabled(false)
+
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
.build();
HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context,
metaClient);
@@ -178,7 +156,8 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
// Should create handle but log warning about missing field
HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
- TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
false), false, false);
assertNotNull(handle);
}
@@ -194,11 +173,11 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
).notNull();
RowType baseRowType = (RowType) dataType.getLogicalType();
- RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withEmbeddedTimelineServerEnabled(false)
+
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
// No event time field configured
.build();
@@ -206,7 +185,8 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
- TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
false), false, false);
assertNotNull(handle);
}
@@ -222,13 +202,13 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
).notNull();
RowType baseRowType = (RowType) dataType.getLogicalType();
- RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
Properties props = new Properties();
props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
"event_time");
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withProperties(props)
+
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
.withEmbeddedTimelineServerEnabled(false)
.build();
@@ -236,7 +216,8 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
- TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
false), false, false);
// Create test records with event time
double eventTimeSeconds1 = 1729512000.0; // 2024-10-21 12:00:00
@@ -293,7 +274,6 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
).notNull();
RowType baseRowType = (RowType) dataType.getLogicalType();
- RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
Properties props = new Properties();
props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
"event_time");
@@ -301,13 +281,15 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
.withPath(basePath)
.withProperties(props)
.withEmbeddedTimelineServerEnabled(false)
+
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
.build();
HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context,
metaClient);
HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
- TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
false), false, false);
long eventTimeMillis1 = 1729512000000L;
long eventTimeMillis2 = 1729515600000L;
@@ -350,7 +332,6 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
).notNull();
RowType baseRowType = (RowType) dataType.getLogicalType();
- RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
Properties props = new Properties();
props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
"event_time");
@@ -358,13 +339,15 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
.withPath(basePath)
.withProperties(props)
.withEmbeddedTimelineServerEnabled(false)
+
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
.build();
HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context,
metaClient);
HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
- TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
false), false, false);
// Create record with null event time
GenericRowData row = new GenericRowData(5);
@@ -400,11 +383,11 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
).notNull();
RowType baseRowType = (RowType) dataType.getLogicalType();
- RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withEmbeddedTimelineServerEnabled(false)
+
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
// No event time field configured
.build();
@@ -412,7 +395,8 @@ public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness
HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
- TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
false), false, false);
// Create record with timestamp
GenericRowData row = new GenericRowData(5);
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataParquetConfigInjector.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataParquetConfigInjector.java
index 67bf8e09cb6d..37ee8b2a59ed 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataParquetConfigInjector.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataParquetConfigInjector.java
@@ -22,11 +22,15 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.LocalTaskContextSupplier;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.testutils.DisableDictionaryInjector;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.HoodieParquetConfigInjector;
import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -35,6 +39,7 @@ import org.apache.hudi.testutils.HoodieFlinkClientTestHarness;
import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
@@ -46,13 +51,17 @@ import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.PrimitiveType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
+import java.util.Arrays;
+import static org.apache.hudi.common.model.HoodieRecord.HoodieRecordType.FLINK;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -221,4 +230,45 @@ public class TestHoodieRowDataParquetConfigInjector
extends HoodieFlinkClientTes
// Verify the parquet file was created
assertTrue(storage.exists(parquetPath));
}
+
+ @Test
+ public void testGetFileWriterPropagatesHoodieSchemaForVectorColumns() throws
Exception {
+ final String instantTime = "104";
+ HoodieStorage storage = HoodieTestUtils.getStorage(tmpDir.toString());
+ final StoragePath parquetPath = new StoragePath(
+ basePath + "/partition/path/test_vector_schema_" + instantTime +
".parquet");
+
+ HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+ "vector_record",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("embedding", HoodieSchema.createVector(2))));
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .build();
+
+ HoodieFileWriter writer = HoodieFileWriterFactory.getFileWriter(
+ instantTime, parquetPath, storage, config, hoodieSchema, new
LocalTaskContextSupplier(), FLINK);
+ assertTrue(writer instanceof HoodieRowDataParquetWriter);
+
+ GenericRowData row = new GenericRowData(2);
+ row.setField(0, 1);
+ row.setField(1, new GenericArrayData(new float[] {1.0f, 2.0f}));
+ ((HoodieRowDataParquetWriter) writer).write(row);
+ writer.close();
+
+ Configuration hadoopConf = new Configuration();
+ Path hadoopPath = new Path(parquetPath.toUri());
+ ParquetFileReader reader = ParquetFileReader.open(hadoopConf, hadoopPath);
+ ParquetMetadata metadata = reader.getFooter();
+ reader.close();
+
+ PrimitiveType embeddingType =
metadata.getFileMetaData().getSchema().getType("embedding").asPrimitiveType();
+ assertEquals(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
embeddingType.getPrimitiveTypeName());
+ assertEquals(8, embeddingType.getTypeLength());
+ assertEquals("embedding:VECTOR(2)",
+
metadata.getFileMetaData().getKeyValueMetaData().get(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY));
+ }
}
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
index 403b84068496..66c6f9e86263 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -20,7 +20,10 @@ package org.apache.hudi.io.storage.row.parquet;
import org.apache.hudi.adapter.DataTypeAdapter;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.storage.row.HoodieRowDataParquetWriteSupport;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
@@ -38,6 +41,7 @@ import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
@@ -48,6 +52,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -84,7 +89,7 @@ public class TestParquetSchemaConverter {
new MapType(
new VarCharType(VarCharType.MAX_LENGTH),
new VarCharType(VarCharType.MAX_LENGTH)),
- new MapType(new IntType(), new BooleanType()),
+ new MapType(new VarCharType(VarCharType.MAX_LENGTH), new
BooleanType()),
RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()));
private static final RowType NESTED_ARRAY_MAP_TYPE =
@@ -121,7 +126,9 @@ public class TestParquetSchemaConverter {
void testParquetFlinkTypeConverting() {
MessageType messageType =
ParquetSchemaConverter.convertToParquetMessageType("flink_schema", ROW_TYPE);
RowType rowType = ParquetSchemaConverter.convertToRowType(messageType);
- assertThat(rowType, is(ROW_TYPE));
+ // The parquet schema is derived from the avro-based HoodieSchema, which
has no byte/short
+ // types, so TINYINT/SMALLINT (including as array elements) are normalized
to INT on the round trip.
+ assertThat(rowType, is((RowType) normalizeByteShortToInt(ROW_TYPE)));
messageType =
ParquetSchemaConverter.convertToParquetMessageType("flink_schema",
NESTED_ARRAY_MAP_TYPE);
rowType = ParquetSchemaConverter.convertToRowType(messageType);
@@ -134,7 +141,7 @@ public class TestParquetSchemaConverter {
DataTypes.FIELD("f_array",
DataTypes.ARRAY(DataTypes.CHAR(10).notNull())),
DataTypes.FIELD("f_map",
- DataTypes.MAP(DataTypes.INT(), DataTypes.VARCHAR(20))),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.VARCHAR(20))),
DataTypes.FIELD("f_row",
DataTypes.ROW(
DataTypes.FIELD("f_row_f0", DataTypes.INT()),
@@ -154,7 +161,7 @@ public class TestParquetSchemaConverter {
+ " }\n"
+ " optional group f_map (MAP) {\n"
+ " repeated group key_value {\n"
- + " required int32 key;\n"
+ + " required binary key (STRING);\n"
+ " optional binary value (STRING);\n"
+ " }\n"
+ " }\n"
@@ -179,11 +186,12 @@ public class TestParquetSchemaConverter {
DataTypes.FIELD("f_array_f1", DataTypes.VARCHAR(10).notNull()),
DataTypes.FIELD("f_array_f3",
DataTypes.ARRAY(DataTypes.CHAR(10).notNull()))).notNull())),
DataTypes.FIELD("f_map",
- DataTypes.MAP(DataTypes.INT(), DataTypes.ROW(
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.ROW(
DataTypes.FIELD("f_map_f0", DataTypes.INT()),
DataTypes.FIELD("f_map_f1",
DataTypes.VARCHAR(10))).notNull())));
- org.apache.parquet.schema.MessageType messageType =
ParquetSchemaConverter.convertToParquetMessageType("converted", (RowType)
dataType.getLogicalType());
+ org.apache.parquet.schema.MessageType messageType =
+ ParquetSchemaConverter.convertToParquetMessageType("converted",
(RowType) dataType.getLogicalType());
assertThat(messageType.getColumns().size(), is(6));
final String expected = "message converted {\n"
@@ -202,7 +210,7 @@ public class TestParquetSchemaConverter {
+ " }\n"
+ " optional group f_map (MAP) {\n"
+ " repeated group key_value {\n"
- + " required int32 key;\n"
+ + " required binary key (STRING);\n"
+ " required group value {\n"
+ " optional int32 f_map_f0;\n"
+ " optional binary f_map_f1 (STRING);\n"
@@ -213,19 +221,54 @@ public class TestParquetSchemaConverter {
assertThat(messageType.toString(), is(expected));
}
+ @Test
+ void testConvertVectorColumnsWithHoodieSchema() {
+ HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+ "vector_record",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("embedding", HoodieSchema.createVector(2)),
+ HoodieSchemaField.of("features", HoodieSchema.createVector(2,
HoodieSchema.Vector.VectorElementType.DOUBLE))));
+ MessageType messageType =
ParquetSchemaConverter.convertToParquetMessageType("converted", hoodieSchema);
+
+ PrimitiveType embeddingType =
messageType.getType("embedding").asPrimitiveType();
+ assertEquals(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
embeddingType.getPrimitiveTypeName());
+ assertEquals(8, embeddingType.getTypeLength());
+ PrimitiveType featuresType =
messageType.getType("features").asPrimitiveType();
+ assertEquals(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
featuresType.getPrimitiveTypeName());
+ assertEquals(16, featuresType.getTypeLength());
+ }
+
+ @Test
+ void testVectorFooterMetadataComesFromHoodieSchema() {
+ HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+ "vector_record",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("embedding", HoodieSchema.createVector(2))));
+
+ HoodieRowDataParquetWriteSupport writeSupport =
+ new HoodieRowDataParquetWriteSupport(new Configuration(),
hoodieSchema, null);
+ Map<String, String> metadata =
writeSupport.finalizeWrite().getExtraMetaData();
+
+ assertEquals("embedding:VECTOR(2)",
metadata.get(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY));
+ }
+
@Test
void testConvertTimestampTypes() {
DataType dataType = DataTypes.ROW(
DataTypes.FIELD("ts_3", DataTypes.TIMESTAMP(3)),
- DataTypes.FIELD("ts_6", DataTypes.TIMESTAMP(6)),
- DataTypes.FIELD("ts_9", DataTypes.TIMESTAMP(9)));
+ DataTypes.FIELD("ts_6", DataTypes.TIMESTAMP(6)));
org.apache.parquet.schema.MessageType messageType =
ParquetSchemaConverter.convertToParquetMessageType("converted",
(RowType) dataType.getLogicalType());
- assertThat(messageType.getColumns().size(), is(3));
+ assertThat(messageType.getColumns().size(), is(2));
final String expected = "message converted {\n"
+ " optional int64 ts_3 (TIMESTAMP(MILLIS,true));\n"
+ " optional int64 ts_6 (TIMESTAMP(MICROS,true));\n"
- + " optional int96 ts_9;\n"
+ "}\n";
assertThat(messageType.toString(), is(expected));
}
@@ -364,4 +407,24 @@ public class TestParquetSchemaConverter {
"Error message should mention VARIANT");
}
+ /**
+ * Replaces TINYINT/SMALLINT with INT recursively, mirroring the lossy
conversion that happens
+ * when a parquet schema is built from the avro-based {@link HoodieSchema}.
+ * The problem is tracked here: <a
href="https://github.com/apache/hudi/issues/18974">Issue#18974</a>.
+ */
+ private static LogicalType normalizeByteShortToInt(LogicalType type) {
+ if (type instanceof TinyIntType || type instanceof SmallIntType) {
+ return new IntType();
+ } else if (type instanceof ArrayType) {
+ return new ArrayType(normalizeByteShortToInt(((ArrayType)
type).getElementType()));
+ } else if (type instanceof RowType) {
+ RowType row = (RowType) type;
+ LogicalType[] children = new LogicalType[row.getFieldCount()];
+ for (int i = 0; i < children.length; i++) {
+ children[i] = normalizeByteShortToInt(row.getTypeAt(i));
+ }
+ return RowType.of(children);
+ }
+ return type;
+ }
}
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
index 25f05eda627c..22a7c711ea4e 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -524,6 +524,63 @@ public class TestHoodieSchemaConverter {
assertFalse(rowType.getTypeAt(1).isNullable());
}
+ @Test
+ public void testConvertVectorColumnsFromFlinkSchema() {
+ RowType rowType = (RowType) DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.STRING().notNull()),
+ DataTypes.FIELD("embedding",
DataTypes.ARRAY(DataTypes.FLOAT().notNull()).notNull()),
+ DataTypes.FIELD("features",
DataTypes.ARRAY(DataTypes.DOUBLE().notNull()).nullable()),
+ DataTypes.FIELD("codes",
DataTypes.ARRAY(DataTypes.TINYINT().notNull()).notNull()))
+ .notNull()
+ .getLogicalType();
+
+ HoodieSchema schema = HoodieSchemaConverter.convertToSchema(rowType,
"test_record", " embedding , features:64,codes:32 ");
+
+ HoodieSchema embedding =
schema.getField("embedding").get().schema().getNonNullType();
+ assertEquals(HoodieSchemaType.VECTOR, embedding.getType());
+ assertEquals(128, ((HoodieSchema.Vector) embedding).getDimension());
+ assertEquals(HoodieSchema.Vector.VectorElementType.FLOAT,
+ ((HoodieSchema.Vector) embedding).getVectorElementType());
+
+ HoodieSchema features = schema.getField("features").get().schema();
+ assertTrue(features.isNullable());
+ HoodieSchema featuresVector = features.getNonNullType();
+ assertEquals(64, ((HoodieSchema.Vector) featuresVector).getDimension());
+ assertEquals(HoodieSchema.Vector.VectorElementType.DOUBLE,
+ ((HoodieSchema.Vector) featuresVector).getVectorElementType());
+
+ HoodieSchema codes =
schema.getField("codes").get().schema().getNonNullType();
+ assertEquals(32, ((HoodieSchema.Vector) codes).getDimension());
+ assertEquals(HoodieSchema.Vector.VectorElementType.INT8,
+ ((HoodieSchema.Vector) codes).getVectorElementType());
+
+ HoodieSchema defaultRecordSchema =
HoodieSchemaConverter.convertToSchema(rowType, "record", "embedding:2");
+ assertEquals("record", defaultRecordSchema.getName());
+ HoodieSchema defaultEmbedding =
defaultRecordSchema.getField("embedding").get().schema().getNonNullType();
+ assertEquals(2, ((HoodieSchema.Vector) defaultEmbedding).getDimension());
+ }
+
+ @Test
+ public void testConvertVectorColumnsValidation() {
+ RowType rowType = (RowType) DataTypes.ROW(
+ DataTypes.FIELD("embedding",
DataTypes.ARRAY(DataTypes.FLOAT().notNull()).notNull()),
+ DataTypes.FIELD("tags",
DataTypes.ARRAY(DataTypes.STRING().notNull()).notNull()),
+ DataTypes.FIELD("plain", DataTypes.STRING()))
+ .notNull()
+ .getLogicalType();
+
+ assertThrows(IllegalArgumentException.class,
+ () -> HoodieSchemaConverter.convertToSchema(rowType, "test_record",
"embedding:0"));
+ assertThrows(IllegalArgumentException.class,
+ () -> HoodieSchemaConverter.convertToSchema(rowType, "test_record",
"embedding,embedding:64"));
+ assertThrows(IllegalArgumentException.class,
+ () -> HoodieSchemaConverter.convertToSchema(rowType, "test_record",
"missing"));
+ assertThrows(IllegalArgumentException.class,
+ () -> HoodieSchemaConverter.convertToSchema(rowType, "test_record",
"plain"));
+ assertThrows(IllegalArgumentException.class,
+ () -> HoodieSchemaConverter.convertToSchema(rowType, "test_record",
"tags"));
+ }
+
private void assertVectorArray(DataType dataType, LogicalTypeRoot
elementTypeRoot, boolean nullable) {
ArrayType arrayType = assertInstanceOf(ArrayType.class,
dataType.getLogicalType());
assertEquals(elementTypeRoot, arrayType.getElementType().getTypeRoot());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
index 98bd1df034dc..a191842b6c5b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
@@ -73,6 +73,19 @@ public final class HoodieSchemaUtils {
throw new UnsupportedOperationException("Utility class cannot be
instantiated");
}
+ /**
+ * Returns the schema for the specified field.
+ *
+ * @param schema record schema that contains the field
+ * @param fieldName field name to resolve
+ * @return schema of the resolved field
+ * @throws HoodieSchemaException if the field does not exist in the schema
+ */
+ public static HoodieSchema getFieldSchema(HoodieSchema schema, String
fieldName) {
+ return
schema.getNonNullType().getField(fieldName).map(HoodieSchemaField::schema)
+ .orElseThrow(() -> new HoodieSchemaException("Field " + fieldName + "
doesn't exist in schema: " + schema));
+ }
+
/**
* Creates a write schema for Hudi operations, adding necessary metadata
fields.
*
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 357fa51b603b..2a4c5f8815fd 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -374,6 +374,14 @@ public class FlinkOptions extends HoodieConfig {
.noDefaultValue()
.withDescription("Source avro schema string, the parsed schema is used
for deserialization");
+ public static final ConfigOption<String> VECTOR_COLUMNS = ConfigOptions
+ .key("hoodie.vector.columns")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Comma-separated top-level VECTOR columns in the format
columnName[:dimension]. "
+ + "The dimension defaults to 128 when omitted. The element type is
inferred from the Flink "
+ + "ARRAY element type: FLOAT, DOUBLE, or TINYINT.");
+
public static final String QUERY_TYPE_SNAPSHOT = "snapshot";
public static final String QUERY_TYPE_READ_OPTIMIZED = "read_optimized";
public static final String QUERY_TYPE_INCREMENTAL = "incremental";
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index 5c324235251c..a6eee1caf517 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -83,7 +83,7 @@ public class BucketBulkInsertWriterHelper extends
BulkInsertWriterHelper {
close();
}
HoodieRowDataCreateHandle rowCreateHandle = new
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
- instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata, isAppendMode && !populateMetaFields);
+ instantTime, taskPartitionId, totalSubtaskNum, taskEpochId,
writerSchema, preserveHoodieMetadata, isAppendMode && !populateMetaFields);
handles.put(fileId, rowCreateHandle);
}
return handles.get(fileId);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 8cb5ef9fdeb4..14e7ea59191c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -20,6 +20,8 @@ package org.apache.hudi.sink.bulk;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
@@ -29,7 +31,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -66,7 +68,8 @@ public class BulkInsertWriterHelper {
protected final long taskEpochId;
protected final HoodieTable hoodieTable;
protected final HoodieWriteConfig writeConfig;
- protected final RowType rowType;
+ // table schema handed to the write handles.
+ protected final HoodieSchema writerSchema;
protected final boolean preserveHoodieMetadata;
protected final boolean isAppendMode;
// used for Append mode only, if true then only initial row data without
metacolumns is written
@@ -104,9 +107,13 @@ public class BulkInsertWriterHelper {
this.taskEpochId = taskEpochId;
this.isAppendMode = OptionsResolver.isAppendMode(conf);
this.populateMetaFields = writeConfig.populateMetaFields();
- this.rowType = preserveHoodieMetadata || (isAppendMode &&
!populateMetaFields)
- ? rowType
- : DataTypeUtils.addMetadataFields(rowType,
writeConfig.allowOperationMetadataField());
+ HoodieSchema schema = HoodieSchemaConverter.convertToSchema(
+ rowType,
+
HoodieSchemaUtils.getRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME)),
+ conf.get(FlinkOptions.VECTOR_COLUMNS));
+ this.writerSchema = preserveHoodieMetadata || (isAppendMode &&
!populateMetaFields)
+ ? schema
+ : HoodieSchemaUtils.addMetadataFields(schema,
writeConfig.allowOperationMetadataField());
this.preserveHoodieMetadata = preserveHoodieMetadata;
this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) &&
conf.get(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
this.fileIdPrefix = UUID.randomUUID().toString();
@@ -147,7 +154,7 @@ public class BulkInsertWriterHelper {
log.info("Creating new file for partition path " + partitionPath);
writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
HoodieRowDataCreateHandle rowCreateHandle = new
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath,
getNextFileId(),
- instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata, isAppendMode && !populateMetaFields);
+ instantTime, taskPartitionId, totalSubtaskNum, taskEpochId,
writerSchema, preserveHoodieMetadata, isAppendMode && !populateMetaFields);
handles.put(partitionPath, rowCreateHandle);
writeMetrics.ifPresent(FlinkStreamWriteMetrics::increaseNumOfOpenHandle);
@@ -208,7 +215,7 @@ public class BulkInsertWriterHelper {
private HoodieRowDataCreateHandle createWriteHandle(String partitionPath) {
writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
HoodieRowDataCreateHandle rowCreateHandle = new
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath,
getNextFileId(),
- instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata, isAppendMode && !populateMetaFields);
+ instantTime, taskPartitionId, totalSubtaskNum, taskEpochId,
writerSchema, preserveHoodieMetadata, isAppendMode && !populateMetaFields);
writeMetrics.ifPresent(FlinkStreamWriteMetrics::endHandleCreation);
return rowCreateHandle;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 2fb4031cd7b4..12a0b1092588 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -520,7 +520,10 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
private static void inferAvroSchema(Configuration conf, LogicalType rowType)
{
if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isEmpty()
&& conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isEmpty()) {
- String inferredSchema = HoodieSchemaConverter.convertToSchema(rowType,
HoodieSchemaUtils.getRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME))).toString();
+ String inferredSchema = HoodieSchemaConverter.convertToSchema(
+ rowType,
+
HoodieSchemaUtils.getRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME)),
+ conf.get(FlinkOptions.VECTOR_COLUMNS)).toString();
conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, inferredSchema);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 9dd79cde9f2d..76d4bae29a88 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -685,7 +685,10 @@ public class HoodieTableSource extends FileIndexReader
implements
}
private HoodieSchema inferSchemaFromDdl() {
- HoodieSchema schema =
HoodieSchemaConverter.convertToSchema(this.tableRowType);
+ HoodieSchema schema = HoodieSchemaConverter.convertToSchema(
+ this.tableRowType,
+
HoodieSchemaUtils.getRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME)),
+ conf.get(FlinkOptions.VECTOR_COLUMNS));
return HoodieSchemaUtils.addMetadataFields(schema,
conf.get(FlinkOptions.CHANGELOG_ENABLED));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index a8bf340f64e2..e1eca3ded180 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -70,6 +70,7 @@ import
org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.CollectionUtil;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -319,9 +320,11 @@ public class HoodieCatalog extends AbstractCatalog {
&& !OptionsResolver.isAppendMode(conf)) {
throw new CatalogException("Primary key definition is missing");
}
+ LogicalType rowType =
resolvedSchema.toPhysicalRowDataType().getLogicalType();
final String avroSchema = HoodieSchemaConverter.convertToSchema(
- resolvedSchema.toPhysicalRowDataType().getLogicalType(),
-
HoodieSchemaUtils.getRecordQualifiedName(tablePath.getObjectName())).toString();
+ rowType,
+ HoodieSchemaUtils.getRecordQualifiedName(tablePath.getObjectName()),
+ conf.get(FlinkOptions.VECTOR_COLUMNS)).toString();
conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
// stores two copies of options:
@@ -619,9 +622,11 @@ public class HoodieCatalog extends AbstractCatalog {
private void refreshTableProperties(ObjectPath tablePath, CatalogBaseTable
newCatalogTable) {
Map<String, String> options = newCatalogTable.getOptions();
ResolvedCatalogTable resolvedTable = (ResolvedCatalogTable)
newCatalogTable;
+ LogicalType rowType =
resolvedTable.getResolvedSchema().toPhysicalRowDataType().getLogicalType();
final String avroSchema = HoodieSchemaConverter.convertToSchema(
-
resolvedTable.getResolvedSchema().toPhysicalRowDataType().getLogicalType(),
-
HoodieSchemaUtils.getRecordQualifiedName(tablePath.getObjectName())).toString();
+ rowType,
+ HoodieSchemaUtils.getRecordQualifiedName(tablePath.getObjectName()),
+ options.get(FlinkOptions.VECTOR_COLUMNS.key())).toString();
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), avroSchema);
java.util.Optional<UniqueConstraint> pkConstraintOpt =
resolvedTable.getResolvedSchema().getPrimaryKey();
if (pkConstraintOpt.isPresent()) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index c42bdf281174..b34bc705bb29 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -80,6 +80,7 @@ import
org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -494,9 +495,11 @@ public class HoodieHiveCatalog extends AbstractCatalog {
private HoodieTableMetaClient initTableIfNotExists(ObjectPath tablePath,
CatalogTable catalogTable) {
Configuration flinkConf = Configuration.fromMap(catalogTable.getOptions());
+ RowType rowType =
DataTypeUtils.toRowType(catalogTable.getUnresolvedSchema());
final String avroSchema = HoodieSchemaConverter.convertToSchema(
- DataTypeUtils.toRowType(catalogTable.getUnresolvedSchema()),
-
HoodieSchemaUtils.getRecordQualifiedName(tablePath.getObjectName())).toString();
+ rowType,
+ HoodieSchemaUtils.getRecordQualifiedName(tablePath.getObjectName()),
+ flinkConf.get(FlinkOptions.VECTOR_COLUMNS)).toString();
flinkConf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
// stores two copies of options:
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
index b2f5d428cdaf..6b4749c2f26e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
@@ -203,7 +203,10 @@ public class TableOptionProperties {
List<String> partitionKeys,
boolean withOperationField) {
RowType rowType =
supplementMetaFields(DataTypeUtils.toRowType(catalogTable.getUnresolvedSchema()),
withOperationField);
- HoodieSchema schema = HoodieSchemaConverter.convertToSchema(rowType);
+ HoodieSchema schema = HoodieSchemaConverter.convertToSchema(
+ rowType,
+ "record",
+ catalogTable.getOptions().get(FlinkOptions.VECTOR_COLUMNS.key()));
String sparkVersion =
catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
Map<String, String> sparkTableProperties =
SparkDataSourceTableUtils.getSparkTableProperties(
partitionKeys,
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java
new file mode 100644
index 000000000000..1b6cda8c1013
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
+import org.apache.hudi.utils.TestTableEnvs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.lang.reflect.Array;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * End-to-end tests for Flink vector column read/write support.
+ */
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestVectorDataSource {
+
+ @TempDir
+ Path tempDir;
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ public void testVectorRoundTrip(HoodieTableType tableType) throws Exception {
+ TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+ String tablePath = tempDir.resolve("round_trip_" +
tableType.name()).toUri().toString();
+ createVectorTable(
+ tableEnv,
+ "vector_table",
+ tablePath,
+ tableType,
+ "embedding:4,features:3,codes:4,nullable_embedding:2");
+
+ execInsertSql(tableEnv,
+ "INSERT INTO vector_table VALUES "
+ + "('id1', "
+ + " ARRAY[CAST(1.0 AS FLOAT), CAST(2.0 AS FLOAT), CAST(3.0 AS
FLOAT), CAST(4.0 AS FLOAT)], "
+ + " ARRAY[CAST(0.1 AS DOUBLE), CAST(0.2 AS DOUBLE), CAST(0.3 AS
DOUBLE)], "
+ + " ARRAY[CAST(1 AS TINYINT), CAST(2 AS TINYINT), CAST(3 AS
TINYINT), CAST(4 AS TINYINT)], "
+ + " ARRAY[CAST(9.0 AS FLOAT), CAST(9.5 AS FLOAT)], "
+ + " 'label1', ARRAY['red', 'blue'], 1000), "
+ + "('id2', "
+ + " ARRAY[CAST(-1.0 AS FLOAT), CAST(-2.0 AS FLOAT), CAST(-3.0 AS
FLOAT), CAST(-4.0 AS FLOAT)], "
+ + " ARRAY[CAST(1.1 AS DOUBLE), CAST(1.2 AS DOUBLE), CAST(1.3 AS
DOUBLE)], "
+ + " ARRAY[CAST(-1 AS TINYINT), CAST(-2 AS TINYINT), CAST(-3 AS
TINYINT), CAST(-4 AS TINYINT)], "
+ + " CAST(NULL AS ARRAY<FLOAT>), "
+ + " 'label2', ARRAY['green'], 2000)");
+
+ assertStoredVectorSchema(tablePath, "embedding", 4,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ assertStoredVectorSchema(tablePath, "features", 3,
HoodieSchema.Vector.VectorElementType.DOUBLE);
+ assertStoredVectorSchema(tablePath, "codes", 4,
HoodieSchema.Vector.VectorElementType.INT8);
+ assertStoredVectorSchema(tablePath, "nullable_embedding", 2,
HoodieSchema.Vector.VectorElementType.FLOAT);
+
+ List<Row> rows = collect(tableEnv,
+ "SELECT id, embedding, features, codes, nullable_embedding, label,
tags, ts FROM vector_table ORDER BY id");
+ assertEquals(2, rows.size());
+
+ Row first = rows.get(0);
+ assertEquals("id1", first.getField(0));
+ assertFloatArray(first.getField(1), new float[] {1.0f, 2.0f, 3.0f, 4.0f});
+ assertDoubleArray(first.getField(2), new double[] {0.1d, 0.2d, 0.3d});
+ assertByteArray(first.getField(3), new byte[] {1, 2, 3, 4});
+ assertFloatArray(first.getField(4), new float[] {9.0f, 9.5f});
+ assertEquals("label1", first.getField(5));
+ assertObjectArray(first.getField(6), new Object[] {"red", "blue"});
+ assertEquals(1000L, first.getField(7));
+
+ Row second = rows.get(1);
+ assertEquals("id2", second.getField(0));
+ assertFloatArray(second.getField(1), new float[] {-1.0f, -2.0f, -3.0f,
-4.0f});
+ assertDoubleArray(second.getField(2), new double[] {1.1d, 1.2d, 1.3d});
+ assertByteArray(second.getField(3), new byte[] {-1, -2, -3, -4});
+ assertNull(second.getField(4));
+ assertEquals("label2", second.getField(5));
+ assertObjectArray(second.getField(6), new Object[] {"green"});
+ assertEquals(2000L, second.getField(7));
+
+ List<Row> nonVectorProjection = collect(tableEnv, "SELECT id, label, ts
FROM vector_table ORDER BY id");
+ assertEquals(Row.of("id1", "label1", 1000L), nonVectorProjection.get(0));
+ assertEquals(Row.of("id2", "label2", 2000L), nonVectorProjection.get(1));
+
+ List<Row> vectorProjection = collect(tableEnv, "SELECT id, embedding FROM
vector_table WHERE id = 'id2'");
+ assertEquals(1, vectorProjection.size());
+ assertFloatArray(vectorProjection.get(0).getField(1), new float[] {-1.0f,
-2.0f, -3.0f, -4.0f});
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ public void testVectorAppendWriteRoundTrip(HoodieTableType tableType) throws
Exception {
+ TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+ String tablePath = tempDir.resolve("append_write_" +
tableType.name()).toUri().toString();
+ createVectorTable(tableEnv, "vector_table", tablePath, tableType,
"embedding:3,features:2,codes:3", "insert");
+
+ execInsertSql(tableEnv,
+ "INSERT INTO vector_table(id, embedding, features, codes, label, ts)
VALUES "
+ + "('id1', "
+ + " ARRAY[CAST(1.0 AS FLOAT), CAST(1.5 AS FLOAT), CAST(2.0 AS
FLOAT)], "
+ + " ARRAY[CAST(0.1 AS DOUBLE), CAST(0.2 AS DOUBLE)], "
+ + " ARRAY[CAST(1 AS TINYINT), CAST(2 AS TINYINT), CAST(3 AS
TINYINT)], "
+ + " 'append1', 1000), "
+ + "('id2', "
+ + " ARRAY[CAST(3.0 AS FLOAT), CAST(3.5 AS FLOAT), CAST(4.0 AS
FLOAT)], "
+ + " ARRAY[CAST(1.1 AS DOUBLE), CAST(1.2 AS DOUBLE)], "
+ + " ARRAY[CAST(-1 AS TINYINT), CAST(-2 AS TINYINT), CAST(-3 AS
TINYINT)], "
+ + " 'append2', 2000)");
+
+ assertStoredVectorSchema(tablePath, "embedding", 3,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ assertStoredVectorSchema(tablePath, "features", 2,
HoodieSchema.Vector.VectorElementType.DOUBLE);
+ assertStoredVectorSchema(tablePath, "codes", 3,
HoodieSchema.Vector.VectorElementType.INT8);
+
+ List<Row> rows = collect(tableEnv, "SELECT id, embedding, features, codes,
label, ts FROM vector_table ORDER BY id");
+ assertEquals(2, rows.size());
+
+ assertEquals("id1", rows.get(0).getField(0));
+ assertFloatArray(rows.get(0).getField(1), new float[] {1.0f, 1.5f, 2.0f});
+ assertDoubleArray(rows.get(0).getField(2), new double[] {0.1d, 0.2d});
+ assertByteArray(rows.get(0).getField(3), new byte[] {1, 2, 3});
+ assertEquals("append1", rows.get(0).getField(4));
+ assertEquals(1000L, rows.get(0).getField(5));
+
+ assertEquals("id2", rows.get(1).getField(0));
+ assertFloatArray(rows.get(1).getField(1), new float[] {3.0f, 3.5f, 4.0f});
+ assertDoubleArray(rows.get(1).getField(2), new double[] {1.1d, 1.2d});
+ assertByteArray(rows.get(1).getField(3), new byte[] {-1, -2, -3});
+ assertEquals("append2", rows.get(1).getField(4));
+ assertEquals(2000L, rows.get(1).getField(5));
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ public void testVectorUpsert(HoodieTableType tableType) throws Exception {
+ TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+ String tablePath = tempDir.resolve("upsert_" +
tableType.name()).toUri().toString();
+ createVectorTable(tableEnv, "vector_table", tablePath, tableType,
"embedding:2");
+
+ execInsertSql(tableEnv,
+ "INSERT INTO vector_table(id, embedding, label, ts) VALUES "
+ + "('id1', ARRAY[CAST(1.0 AS FLOAT), CAST(1.1 AS FLOAT)], 'old1',
1), "
+ + "('id2', ARRAY[CAST(2.0 AS FLOAT), CAST(2.2 AS FLOAT)], 'old2',
1)");
+ execInsertSql(tableEnv,
+ "INSERT INTO vector_table(id, embedding, label, ts) VALUES "
+ + "('id1', ARRAY[CAST(9.0 AS FLOAT), CAST(9.9 AS FLOAT)], 'new1',
10), "
+ + "('id3', ARRAY[CAST(3.0 AS FLOAT), CAST(3.3 AS FLOAT)], 'new3',
1)");
+
+ List<Row> rows = collect(tableEnv, "SELECT id, embedding, label, ts FROM
vector_table ORDER BY id");
+ assertEquals(3, rows.size());
+
+ assertEquals("id1", rows.get(0).getField(0));
+ assertFloatArray(rows.get(0).getField(1), new float[] {9.0f, 9.9f});
+ assertEquals("new1", rows.get(0).getField(2));
+ assertEquals(10L, rows.get(0).getField(3));
+
+ assertEquals("id2", rows.get(1).getField(0));
+ assertFloatArray(rows.get(1).getField(1), new float[] {2.0f, 2.2f});
+ assertEquals("old2", rows.get(1).getField(2));
+
+ assertEquals("id3", rows.get(2).getField(0));
+ assertFloatArray(rows.get(2).getField(1), new float[] {3.0f, 3.3f});
+ assertEquals("new3", rows.get(2).getField(2));
+ }
+
+ @Test
+ public void testVectorDimensionMismatchFails() {
+ TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+ String tablePath =
tempDir.resolve("dimension_mismatch").toUri().toString();
+ createVectorTable(tableEnv, "vector_table", tablePath,
HoodieTableType.COPY_ON_WRITE, "embedding:3");
+
+ Exception exception = assertThrows(Exception.class, () ->
execInsertSql(tableEnv,
+ "INSERT INTO vector_table(id, embedding, label, ts) VALUES "
+ + "('id1', ARRAY[CAST(1.0 AS FLOAT), CAST(2.0 AS FLOAT)], 'bad',
1)"));
+ assertTrue(containsMessage(exception, "dimension mismatch"),
+ "Expected dimension mismatch in exception chain, got: " +
exception.getMessage());
+ }
+
+ private static void createVectorTable(
+ TableEnvironment tableEnv,
+ String tableName,
+ String tablePath,
+ HoodieTableType tableType,
+ String vectorColumns) {
+ createVectorTable(tableEnv, tableName, tablePath, tableType,
vectorColumns, null);
+ }
+
+ private static void createVectorTable(
+ TableEnvironment tableEnv,
+ String tableName,
+ String tablePath,
+ HoodieTableType tableType,
+ String vectorColumns,
+ String writeOperation) {
+ tableEnv.executeSql(
+ "CREATE TABLE " + tableName + " ("
+ + " id STRING,"
+ + " embedding ARRAY<FLOAT>,"
+ + " features ARRAY<DOUBLE>,"
+ + " codes ARRAY<TINYINT>,"
+ + " nullable_embedding ARRAY<FLOAT>,"
+ + " label STRING,"
+ + " tags ARRAY<STRING>,"
+ + " ts BIGINT,"
+ + " PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'hudi',"
+ + " 'path' = '" + tablePath + "',"
+ + " '" + FlinkOptions.TABLE_TYPE.key() + "' = '" +
tableType.name() + "',"
+ + " '" + FlinkOptions.ORDERING_FIELDS.key() + "' = 'ts',"
+ + " '" + FlinkOptions.VECTOR_COLUMNS.key() + "' = '" +
vectorColumns + "',"
+ + " '" + FlinkOptions.METADATA_ENABLED.key() + "' = 'false',"
+ + " '" + FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key() + "' =
'false',"
+ + " '" + FlinkOptions.COMPACTION_ASYNC_ENABLED.key() + "' =
'false',"
+ + (writeOperation == null ? "" : " '" +
FlinkOptions.OPERATION.key() + "' = '" + writeOperation + "',")
+ + " '" + FlinkOptions.WRITE_TASKS.key() + "' = '1',"
+ + " '" + FlinkOptions.READ_TASKS.key() + "' = '1'"
+ + ")");
+ }
+
+ private static void execInsertSql(TableEnvironment tableEnv, String insert)
throws ExecutionException, InterruptedException {
+ TableResult tableResult = tableEnv.executeSql(insert);
+ tableResult.await();
+ }
+
+ private static List<Row> collect(TableEnvironment tableEnv, String query) {
+ return CollectionUtil.iteratorToList(tableEnv.executeSql(query).collect());
+ }
+
+ private static void assertStoredVectorSchema(
+ String tablePath,
+ String fieldName,
+ int dimension,
+ HoodieSchema.Vector.VectorElementType elementType) {
+ Configuration conf = new Configuration();
+ HoodieSchema tableSchema = StreamerUtil.getTableConfig(tablePath,
HadoopConfigurations.getHadoopConf(conf))
+ .flatMap(config -> config.getTableCreateSchema())
+ .orElseThrow(() -> new AssertionError("Expected table create schema
for " + tablePath));
+ Option<HoodieSchemaField> fieldOpt = tableSchema.getField(fieldName);
+ assertTrue(fieldOpt.isPresent(), "Expected field " + fieldName + " in
table schema");
+ HoodieSchema fieldSchema = fieldOpt.get().schema().getNonNullType();
+ assertEquals(HoodieSchemaType.VECTOR, fieldSchema.getType());
+ HoodieSchema.Vector vector = (HoodieSchema.Vector) fieldSchema;
+ assertEquals(dimension, vector.getDimension());
+ assertEquals(elementType, vector.getVectorElementType());
+ }
+
+ private static int arrayLength(Object value) {
+ assertNotNull(value);
+ if (value instanceof List) {
+ return ((List<?>) value).size();
+ }
+ return Array.getLength(value);
+ }
+
+ private static Object arrayValue(Object value, int index) {
+ if (value instanceof List) {
+ return ((List<?>) value).get(index);
+ }
+ return Array.get(value, index);
+ }
+
+ private static void assertFloatArray(Object value, float[] expected) {
+ assertEquals(expected.length, arrayLength(value));
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], ((Number) arrayValue(value, i)).floatValue(),
0.00001f);
+ }
+ }
+
+ private static void assertDoubleArray(Object value, double[] expected) {
+ assertEquals(expected.length, arrayLength(value));
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], ((Number) arrayValue(value, i)).doubleValue(),
0.0000001d);
+ }
+ }
+
+ private static void assertByteArray(Object value, byte[] expected) {
+ assertEquals(expected.length, arrayLength(value));
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], ((Number) arrayValue(value, i)).byteValue());
+ }
+ }
+
+ private static void assertObjectArray(Object value, Object[] expected) {
+ assertEquals(expected.length, arrayLength(value));
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], arrayValue(value, i));
+ }
+ }
+
+ private static boolean containsMessage(Throwable throwable, String message) {
+ return throwable != null && ((throwable.getMessage() != null &&
throwable.getMessage().contains(message))
+ || containsMessage(throwable.getCause(), message));
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
index d7c68b7e318d..d234fb782dbe 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
@@ -32,6 +32,7 @@ import org.apache.flink.formats.json.JsonToRowDataConverters;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
@@ -39,6 +40,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -201,4 +203,67 @@ class TestRowDataToAvroConverters {
+ (typeValue == null ? "null" : typeValue.getClass().getName()));
Assertions.assertEquals("OUT_OF_LINE", typeValue.toString());
}
-}
\ No newline at end of file
+
+ @Test
+ void testRowDataToAvroVectorEncoding() {
+ HoodieSchema schema = HoodieSchema.createRecord(
+ "vector_record",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("embedding", HoodieSchema.createVector(2)),
+ HoodieSchemaField.of("features", HoodieSchema.createVector(2,
HoodieSchema.Vector.VectorElementType.DOUBLE)),
+ HoodieSchemaField.of("codes", HoodieSchema.createVector(3,
HoodieSchema.Vector.VectorElementType.INT8))));
+ RowType rowType = (RowType)
HoodieSchemaConverter.convertToDataType(schema).getLogicalType();
+ RowDataToAvroConverters.RowDataToAvroConverter converter =
+ RowDataToAvroConverters.createConverter(rowType);
+
+ GenericRowData rowData = GenericRowData.of(
+ 1,
+ new GenericArrayData(new float[] {1.0f, 2.0f}),
+ new GenericArrayData(new double[] {3.0d, 4.0d}),
+ new GenericArrayData(new byte[] {5, 6, 7}));
+ GenericRecord avroRecord = (GenericRecord) converter.convert(schema,
rowData);
+
+ Assertions.assertArrayEquals(vectorBytes(1.0f, 2.0f),
+ ((GenericData.Fixed) avroRecord.get("embedding")).bytes());
+ Assertions.assertArrayEquals(vectorBytes(3.0d, 4.0d),
+ ((GenericData.Fixed) avroRecord.get("features")).bytes());
+ Assertions.assertArrayEquals(new byte[] {5, 6, 7},
+ ((GenericData.Fixed) avroRecord.get("codes")).bytes());
+ }
+
+ @Test
+ void testRowDataToAvroVectorValidation() {
+ HoodieSchema schema = HoodieSchema.createRecord(
+ "vector_record",
+ null,
+ null,
+ Arrays.asList(HoodieSchemaField.of("embedding",
HoodieSchema.createVector(2))));
+ RowType rowType = (RowType)
HoodieSchemaConverter.convertToDataType(schema).getLogicalType();
+ RowDataToAvroConverters.RowDataToAvroConverter converter =
+ RowDataToAvroConverters.createConverter(rowType);
+
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () -> converter.convert(schema, GenericRowData.of(new
GenericArrayData(new float[] {1.0f}))));
+ }
+
+ private static byte[] vectorBytes(float... values) {
+ ByteBuffer buffer = ByteBuffer.allocate(values.length * Float.BYTES)
+ .order(HoodieSchema.VectorLogicalType.VECTOR_BYTE_ORDER);
+ for (float value : values) {
+ buffer.putFloat(value);
+ }
+ return buffer.array();
+ }
+
+ private static byte[] vectorBytes(double... values) {
+ ByteBuffer buffer = ByteBuffer.allocate(values.length * Double.BYTES)
+ .order(HoodieSchema.VectorLogicalType.VECTOR_BYTE_ORDER);
+ for (double value : values) {
+ buffer.putDouble(value);
+ }
+ return buffer.array();
+ }
+}