This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7af4fb9acfae feat(flink): Support writing VECTOR columns for flink 
writer (#18877)
7af4fb9acfae is described below

commit 7af4fb9acfae9572fe2bcd221e9799e525d915d1
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon Jun 22 11:06:13 2026 +0800

    feat(flink): Support writing VECTOR columns for flink writer (#18877)
    
    * feat(flink): Support writing VECTOR columns for flink writer
    
    ---------
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../io/storage/row/HoodieRowDataCreateHandle.java  |  14 +-
 .../row/HoodieRowDataParquetWriteSupport.java      |   6 +
 .../io/storage/row/RowDataParquetWriteSupport.java |  16 +-
 .../storage/row/parquet/ParquetRowDataWriter.java  |  57 +++-
 .../row/parquet/ParquetSchemaConverter.java        |  60 +++-
 .../apache/hudi/util/HoodieSchemaConverter.java    |  70 ++++-
 .../apache/hudi/util/RowDataToAvroConverters.java  |   5 +-
 .../org/apache/hudi/util/VectorColumnParser.java   | 130 ++++++++
 .../apache/hudi/util/VectorConversionUtils.java    |  35 +++
 .../storage/row/TestHoodieRowDataCreateHandle.java |  68 ++---
 .../TestHoodieRowDataParquetConfigInjector.java    |  50 ++++
 .../row/parquet/TestParquetSchemaConverter.java    |  85 +++++-
 .../hudi/util/TestHoodieSchemaConverter.java       |  57 ++++
 .../hudi/common/schema/HoodieSchemaUtils.java      |  13 +
 .../apache/hudi/configuration/FlinkOptions.java    |   8 +
 .../sink/bucket/BucketBulkInsertWriterHelper.java  |   2 +-
 .../hudi/sink/bulk/BulkInsertWriterHelper.java     |  21 +-
 .../org/apache/hudi/table/HoodieTableFactory.java  |   5 +-
 .../org/apache/hudi/table/HoodieTableSource.java   |   5 +-
 .../apache/hudi/table/catalog/HoodieCatalog.java   |  13 +-
 .../hudi/table/catalog/HoodieHiveCatalog.java      |   7 +-
 .../hudi/table/catalog/TableOptionProperties.java  |   5 +-
 .../apache/hudi/table/ITTestVectorDataSource.java  | 332 +++++++++++++++++++++
 .../hudi/utils/TestRowDataToAvroConverters.java    |  67 ++++-
 24 files changed, 1024 insertions(+), 107 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index e3a8445d6675..35953870957d 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecordDelegate;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -79,6 +80,8 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
   private final String fileId;
   private final boolean preserveHoodieMetadata;
   private final boolean skipMetadataWrite;
+  // The schema (with meta fields appended when required) used to create the 
file writer.
+  private final HoodieSchema writerSchema;
   private final HoodieStorage storage;
   protected final WriteStatus writeStatus;
   private final HoodieRecordLocation newRecordLocation;
@@ -90,7 +93,7 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
 
   public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig 
writeConfig, String partitionPath, String fileId,
                                    String instantTime, int taskPartitionId, 
long taskId, long taskEpochId,
-                                   RowType rowType, boolean 
preserveHoodieMetadata, boolean skipMetadataWrite) {
+                                   HoodieSchema schema, boolean 
preserveHoodieMetadata, boolean skipMetadataWrite) {
     this.partitionPath = partitionPath;
     this.table = table;
     this.writeConfig = writeConfig;
@@ -102,10 +105,11 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
     this.newRecordLocation = new HoodieRecordLocation(instantTime, fileId);
     this.preserveHoodieMetadata = preserveHoodieMetadata;
     this.skipMetadataWrite = skipMetadataWrite;
+    this.writerSchema = schema;
     this.currTimer = HoodieTimer.start();
     this.storage = table.getStorage();
     this.path = makeNewPath(partitionPath);
-    this.eventTimeFieldGetter = initEventTimeFieldGetter(writeConfig, rowType);
+    this.eventTimeFieldGetter = initEventTimeFieldGetter(writeConfig, 
HoodieSchemaConverter.convertToRowType(writerSchema));
 
     this.writeStatus = new WriteStatus(table.shouldTrackSuccessRecords(),
         writeConfig.getWriteStatusFailureFraction());
@@ -122,7 +126,7 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
               table.getPartitionMetafileFormat());
       partitionMetadata.trySave();
       createMarkerFile(partitionPath, 
FSUtils.makeBaseFileName(this.instantTime, getWriteToken(), this.fileId, 
table.getBaseFileExtension()));
-      this.fileWriter = createNewFileWriter(path, table, writeConfig, rowType, 
this.instantTime);
+      this.fileWriter = createNewFileWriter(path, table, writeConfig, 
this.instantTime);
     } catch (IOException e) {
       throw new HoodieInsertException("Failed to initialize file writer for 
path " + path, e);
     }
@@ -292,7 +296,7 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
   }
 
   protected HoodieRowDataFileWriter createNewFileWriter(
-      Path path, HoodieTable hoodieTable, HoodieWriteConfig config, RowType 
rowType, String instantTime)
+      Path path, HoodieTable hoodieTable, HoodieWriteConfig config, String 
instantTime)
       throws IOException {
     StoragePath storagePath = new StoragePath(path.toUri());
     return (HoodieRowDataFileWriter) HoodieFileWriterFactory.getFileWriter(
@@ -300,7 +304,7 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
         storagePath,
         hoodieTable.getStorage(),
         config,
-        HoodieSchemaConverter.convertToSchema(rowType).getNonNullType(),
+        writerSchema,
         hoodieTable.getTaskContextSupplier(),
         HoodieRecord.HoodieRecordType.FLINK);
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
index 14ed278b70a2..b3c70c0c6186 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.hadoop.api.WriteSupport;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -52,6 +53,11 @@ public class HoodieRowDataParquetWriteSupport extends 
RowDataParquetWriteSupport
     Map<String, String> extraMetadata =
         
bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata)
             .orElse(Collections.emptyMap());
+    String vectorColumnsMetadata = 
HoodieSchema.buildVectorColumnsMetadataValue(schema);
+    if (!vectorColumnsMetadata.isEmpty()) {
+      extraMetadata = new HashMap<>(extraMetadata);
+      extraMetadata.put(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY, 
vectorColumnsMetadata);
+    }
 
     return new WriteSupport.FinalizedWriteContext(extraMetadata);
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java
index 01d2806e7633..5043a6b9af66 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java
@@ -22,10 +22,8 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.io.storage.row.parquet.ParquetRowDataWriter;
 import org.apache.hudi.io.storage.row.parquet.ParquetSchemaConverter;
-import org.apache.hudi.util.HoodieSchemaConverter;
 
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.logical.RowType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.io.api.RecordConsumer;
@@ -38,21 +36,21 @@ import java.util.HashMap;
  */
 public class RowDataParquetWriteSupport extends WriteSupport<RowData> {
 
-  private final RowType rowType;
-  private final MessageType schema;
+  protected final HoodieSchema schema;
+  private final MessageType fileSchema;
   private ParquetRowDataWriter writer;
   protected final Configuration hadoopConf;
 
-  public RowDataParquetWriteSupport(HoodieSchema hoodieSchema, Configuration 
config) {
+  public RowDataParquetWriteSupport(HoodieSchema schema, Configuration config) 
{
     super();
-    this.rowType = HoodieSchemaConverter.convertToRowType(hoodieSchema);
+    this.schema = schema;
     this.hadoopConf = new Configuration(config);
-    this.schema = 
ParquetSchemaConverter.convertToParquetMessageType("flink_schema", rowType);
+    this.fileSchema = 
ParquetSchemaConverter.convertToParquetMessageType("flink_schema", schema);
   }
 
   @Override
   public WriteContext init(Configuration configuration) {
-    return new WriteContext(schema, new HashMap<>());
+    return new WriteContext(fileSchema, new HashMap<>());
   }
 
   @Override
@@ -62,7 +60,7 @@ public class RowDataParquetWriteSupport extends 
WriteSupport<RowData> {
         hadoopConf.getBoolean(
             HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(),
             HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue());
-    this.writer = new ParquetRowDataWriter(recordConsumer, rowType, schema, 
utcTimestamp);
+    this.writer = new ParquetRowDataWriter(recordConsumer, utcTimestamp, 
schema);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
index b3b612506b95..4a3c13064685 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
@@ -18,7 +18,12 @@
 
 package org.apache.hudi.io.storage.row.parquet;
 
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.VectorConversionUtils;
 
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalDataUtils;
@@ -35,7 +40,6 @@ import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.util.Preconditions;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.RecordConsumer;
-import org.apache.parquet.schema.GroupType;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -64,16 +68,17 @@ public class ParquetRowDataWriter {
 
   public ParquetRowDataWriter(
       RecordConsumer recordConsumer,
-      RowType rowType,
-      GroupType schema,
-      boolean utcTimestamp) {
+      boolean utcTimestamp,
+      HoodieSchema schema) {
     this.recordConsumer = recordConsumer;
     this.utcTimestamp = utcTimestamp;
 
+    RowType rowType = HoodieSchemaConverter.convertToRowType(schema);
     this.filedWriters = new FieldWriter[rowType.getFieldCount()];
     this.fieldNames = rowType.getFieldNames().toArray(new String[0]);
     for (int i = 0; i < rowType.getFieldCount(); i++) {
-      this.filedWriters[i] = createWriter(rowType.getTypeAt(i));
+      HoodieSchema fieldSchema = HoodieSchemaUtils.getFieldSchema(schema, 
fieldNames[i]);
+      this.filedWriters[i] = createWriter(rowType.getTypeAt(i), fieldSchema);
     }
   }
 
@@ -97,7 +102,9 @@ public class ParquetRowDataWriter {
     recordConsumer.endMessage();
   }
 
-  private FieldWriter createWriter(LogicalType t) {
+  private FieldWriter createWriter(LogicalType t, HoodieSchema oriFieldSchema) 
{
+    // strip the nullable wrapper so the element/key/value/record sub-schemas 
can be resolved directly
+    HoodieSchema fieldSchema = oriFieldSchema.getNonNullType();
     switch (t.getTypeRoot()) {
       case CHAR:
       case VARCHAR:
@@ -141,21 +148,27 @@ public class ParquetRowDataWriter {
           return new Timestamp96Writer(tsLtzPrecision);
         }
       case ARRAY:
+        if (fieldSchema.getType() == HoodieSchemaType.VECTOR) {
+          return new VectorWriter((HoodieSchema.Vector) fieldSchema);
+        }
         ArrayType arrayType = (ArrayType) t;
         LogicalType elementType = arrayType.getElementType();
-        FieldWriter elementWriter = createWriter(elementType);
+        FieldWriter elementWriter = createWriter(elementType, 
fieldSchema.getElementType());
         return new ArrayWriter(elementWriter);
       case MAP:
         MapType mapType = (MapType) t;
         LogicalType keyType = mapType.getKeyType();
         LogicalType valueType = mapType.getValueType();
-        FieldWriter keyWriter = createWriter(keyType);
-        FieldWriter valueWriter = createWriter(valueType);
+        FieldWriter keyWriter = createWriter(keyType, 
fieldSchema.getKeyType());
+        FieldWriter valueWriter = createWriter(valueType, 
fieldSchema.getValueType());
         return new MapWriter(keyWriter, valueWriter);
       case ROW:
         RowType rowType = (RowType) t;
+        ValidationUtils.checkArgument(fieldSchema.getType() == 
HoodieSchemaType.RECORD || fieldSchema.getType() == HoodieSchemaType.BLOB,
+            "Hoodie schema should be RECORD or BLOB type.");
         FieldWriter[] fieldWriters = rowType.getFields().stream()
-            
.map(RowType.RowField::getType).map(this::createWriter).toArray(FieldWriter[]::new);
+            .map(field -> createWriter(field.getType(), 
HoodieSchemaUtils.getFieldSchema(fieldSchema, field.getName())))
+            .toArray(FieldWriter[]::new);
         String[] fieldNames = rowType.getFields().stream()
             .map(RowType.RowField::getName).toArray(String[]::new);
         return new RowWriter(fieldNames, fieldWriters);
@@ -274,6 +287,29 @@ public class ParquetRowDataWriter {
     }
   }
 
+  private class VectorWriter implements FieldWriter {
+    private final HoodieSchema.Vector vectorSchema;
+
+    private VectorWriter(HoodieSchema.Vector vectorSchema) {
+      this.vectorSchema = vectorSchema;
+    }
+
+    @Override
+    public void write(RowData row, int ordinal) {
+      writeVector(row.getArray(ordinal));
+    }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      writeVector(array.getArray(ordinal));
+    }
+
+    private void writeVector(ArrayData vectorArray) {
+      recordConsumer.addBinary(
+          
Binary.fromReusedByteArray(VectorConversionUtils.encodeVectorArrayData(vectorArray,
 vectorSchema)));
+    }
+  }
+
   private class IntWriter implements FieldWriter {
 
     @Override
@@ -619,4 +655,3 @@ public class ParquetRowDataWriter {
     }
   }
 }
-
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index dd19100f8ccd..5bad03d9253e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -20,7 +20,10 @@ package org.apache.hudi.io.storage.row.parquet;
 
 import org.apache.hudi.adapter.DataTypeAdapter;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.util.HoodieSchemaConverter;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.table.api.DataTypes;
@@ -205,12 +208,32 @@ public class ParquetSchemaConverter {
     return Pair.of(keyValue.getType(MAP_KEY_NAME), 
keyValue.getType(MAP_VALUE_NAME));
   }
 
+  /**
+   * Converts a Flink row type to a Parquet message type.
+   *
+   * <p>This overload preserves the pre-VECTOR public API, but converting from
+   * {@link RowType} to {@link HoodieSchema} loses VECTOR logical metadata 
such as dimension and
+   * element type. Prefer {@link #convertToParquetMessageType(String, 
HoodieSchema)} whenever the
+   * caller already has a metadata-aware {@link HoodieSchema}.
+   */
   public static MessageType convertToParquetMessageType(String name, RowType 
rowType) {
+    HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(rowType);
+    return convertToParquetMessageType(name, hoodieSchema);
+  }
+
+  public static MessageType convertToParquetMessageType(String name, 
HoodieSchema oriRowSchema) {
+    HoodieSchema rowSchema = oriRowSchema.getNonNullType();
+    RowType rowType = HoodieSchemaConverter.convertToRowType(rowSchema);
     Type[] types = new Type[rowType.getFieldCount()];
     for (int i = 0; i < rowType.getFieldCount(); i++) {
       String fieldName = rowType.getFieldNames().get(i);
       LogicalType fieldType = rowType.getTypeAt(i);
-      types[i] = convertToParquetType(fieldName, fieldType, 
fieldType.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED);
+      HoodieSchema fieldSchema = HoodieSchemaUtils.getFieldSchema(rowSchema, 
fieldName);
+      types[i] = convertToParquetType(
+          fieldName,
+          fieldType,
+          fieldType.isNullable() ? Type.Repetition.OPTIONAL : 
Type.Repetition.REQUIRED,
+          fieldSchema);
     }
     return new MessageType(name, types);
   }
@@ -264,7 +287,8 @@ public class ParquetSchemaConverter {
   }
 
   private static Type convertToParquetType(
-      String name, LogicalType type, Type.Repetition repetition) {
+      String name, LogicalType type, Type.Repetition repetition, HoodieSchema 
oriFieldSchema) {
+    HoodieSchema fieldSchema = oriFieldSchema.getNonNullType();
     switch (type.getTypeRoot()) {
       case CHAR:
       case VARCHAR:
@@ -338,6 +362,15 @@ public class ParquetSchemaConverter {
               .named(name);
         }
       case ARRAY:
+        if (fieldSchema.getType() == HoodieSchemaType.VECTOR) {
+          HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) fieldSchema;
+          int fixedSize = Math.multiplyExact(
+              vectorSchema.getDimension(),
+              vectorSchema.getVectorElementType().getElementSize());
+          return 
Types.primitive(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
repetition)
+              .length(fixedSize)
+              .named(name);
+        }
         // align with Spark And Avro regarding the standard mode array type, 
see:
         // 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
         //
@@ -350,7 +383,13 @@ public class ParquetSchemaConverter {
         Type.Repetition eleRepetition =
             arrayType.getElementType().isNullable() ? Type.Repetition.OPTIONAL 
: Type.Repetition.REQUIRED;
         return ConversionPatterns.listOfElements(
-            repetition, name, convertToParquetType("element", 
arrayType.getElementType(), eleRepetition));
+            repetition,
+            name,
+            convertToParquetType(
+                "element",
+                arrayType.getElementType(),
+                eleRepetition,
+                fieldSchema.getElementType()));
       case MAP:
         // <map-repetition> group <name> (MAP) {
         //   repeated group key_value {
@@ -366,15 +405,24 @@ public class ParquetSchemaConverter {
             .addField(
                 Types
                     .repeatedGroup()
-                    .addField(convertToParquetType("key", keyType, 
Type.Repetition.REQUIRED))
-                    .addField(convertToParquetType("value", valueType, 
valueType.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED))
+                    .addField(convertToParquetType(
+                        "key", keyType, Type.Repetition.REQUIRED, 
fieldSchema.getKeyType()))
+                    .addField(convertToParquetType(
+                        "value",
+                        valueType,
+                        valueType.isNullable() ? Type.Repetition.OPTIONAL : 
Type.Repetition.REQUIRED,
+                        fieldSchema.getValueType()))
                     .named("key_value"))
             .named(name);
       case ROW:
         RowType rowType = (RowType) type;
         Types.GroupBuilder<GroupType> builder = Types.buildGroup(repetition);
         rowType.getFields().forEach(field -> builder
-            .addField(convertToParquetType(field.getName(), field.getType(), 
field.getType().isNullable() ? Type.Repetition.OPTIONAL : 
Type.Repetition.REQUIRED)));
+            .addField(convertToParquetType(
+                field.getName(),
+                field.getType(),
+                field.getType().isNullable() ? Type.Repetition.OPTIONAL : 
Type.Repetition.REQUIRED,
+                HoodieSchemaUtils.getFieldSchema(fieldSchema, 
field.getName()))));
         return builder.named(name);
       default:
         if (DataTypeAdapter.isVariantType(type)) {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
index c859675488ad..af992d1a18f4 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
@@ -44,7 +45,9 @@ import org.apache.flink.table.types.logical.TimestampType;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -79,6 +82,38 @@ public class HoodieSchemaConverter {
    * @return HoodieSchema matching this logical type
    */
   public static HoodieSchema convertToSchema(LogicalType logicalType, String 
rowName) {
+    return convertToSchema(logicalType, rowName, Collections.emptyMap());
+  }
+
+  /**
+   * Converts a Flink LogicalType into a HoodieSchema with top-level VECTOR 
columns.
+   *
+   * <p>The vector column option uses {@code colName[:dimension]} entries 
separated by commas.
+   * The dimension defaults to 128. The vector element type is inferred from 
the Flink array
+   * element type: FLOAT, DOUBLE, or TINYINT.
+   *
+   * @param logicalType    Flink logical type
+   * @param rowName        the record name
+   * @param vectorColumns  comma-separated vector column descriptors, or 
null/empty
+   * @return HoodieSchema matching this logical type
+   */
+  public static HoodieSchema convertToSchema(
+      LogicalType logicalType,
+      String rowName,
+      String vectorColumns) {
+    Map<String, Integer> vectorColumnMap = vectorColumns == null || 
vectorColumns.trim().isEmpty()
+        ? Collections.emptyMap() : VectorColumnParser.parse(vectorColumns);
+    validateVectorColumns(logicalType, vectorColumnMap);
+    return convertToSchema(logicalType, rowName, vectorColumnMap);
+  }
+
+  private static HoodieSchema convertToSchema(
+      LogicalType logicalType,
+      String rowName,
+      Map<String, Integer> vectorColumns) {
+    ValidationUtils.checkArgument(vectorColumns.isEmpty() || logicalType 
instanceof RowType,
+        "VECTOR columns can only be configured for top-level ROW schemas.");
+
     int precision;
     boolean nullable = logicalType.isNullable();
     HoodieSchema schema;
@@ -196,8 +231,13 @@ public class HoodieSchemaConverter {
           String fieldName = fieldNames.get(i);
           LogicalType fieldType = rowType.getTypeAt(i);
 
-          // Recursive call for field schema
-          HoodieSchema fieldSchema = convertToSchema(fieldType, rowName + "." 
+ fieldName);
+          HoodieSchema fieldSchema;
+          if (vectorColumns.containsKey(fieldName)) {
+            fieldSchema = VectorColumnParser.convertVectorField(fieldName, 
fieldType, vectorColumns.get(fieldName));
+          } else {
+            // Recursive call for field schema
+            fieldSchema = convertToSchema(fieldType, rowName + "." + 
fieldName, Collections.emptyMap());
+          }
 
           // Create field with or without default value
           HoodieSchemaField field;
@@ -215,13 +255,13 @@ public class HoodieSchemaConverter {
       case MULTISET:
       case MAP:
         LogicalType valueType = extractValueTypeForMap(logicalType);
-        HoodieSchema valueSchema = convertToSchema(valueType, rowName);
+        HoodieSchema valueSchema = convertToSchema(valueType, rowName, 
Collections.emptyMap());
         schema = HoodieSchema.createMap(valueSchema);
         break;
 
       case ARRAY:
         ArrayType arrayType = (ArrayType) logicalType;
-        HoodieSchema elementSchema = 
convertToSchema(arrayType.getElementType(), rowName);
+        HoodieSchema elementSchema = 
convertToSchema(arrayType.getElementType(), rowName, Collections.emptyMap());
         schema = HoodieSchema.createArray(elementSchema);
         break;
 
@@ -238,6 +278,28 @@ public class HoodieSchemaConverter {
     return nullable ? HoodieSchema.createNullable(schema) : schema;
   }
 
+  /**
+   * Validates that all configured VECTOR columns resolve to top-level fields 
of the row schema.
+   *
+   * <p>Invoked by {@link #convertToSchema(LogicalType, String, String)} 
before schema inference so
+   * an unknown column is rejected instead of being silently ignored during 
conversion.
+   *
+   * @param logicalType   Flink logical type
+   * @param vectorColumns parsed vector columns (normalized column name to 
dimension), may be empty
+   */
+  private static void validateVectorColumns(LogicalType logicalType, 
Map<String, Integer> vectorColumns) {
+    if (vectorColumns.isEmpty()) {
+      return;
+    }
+    List<String> normalizedFieldNames = ((RowType) 
logicalType).getFieldNames();
+    vectorColumns.keySet().stream()
+        .filter(vectorColumn -> !normalizedFieldNames.contains(vectorColumn))
+        .findFirst()
+        .ifPresent(vectorColumn -> {
+          throw new IllegalArgumentException("VECTOR column '" + vectorColumn 
+ "' does not exist in the table schema.");
+        });
+  }
+
   /**
    * Extracts value type for map conversion.
    * Maps must have string keys for Avro/HoodieSchema compatibility.
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
index 050ad483de10..fcd435532ede 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
@@ -331,6 +331,10 @@ public class RowDataToAvroConverters {
 
       @Override
       public Object convert(HoodieSchema schema, Object object) {
+        if (schema.getType() == HoodieSchemaType.VECTOR) {
+          HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) schema;
+          return new GenericData.Fixed(schema.toAvroSchema(), 
VectorConversionUtils.encodeVectorArrayData((ArrayData) object, vectorSchema));
+        }
         final HoodieSchema elementSchema = schema.getElementType();
         ArrayData arrayData = (ArrayData) object;
         List<Object> list = new ArrayList<>();
@@ -394,4 +398,3 @@ public class RowDataToAvroConverters {
     };
   }
 }
-
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorColumnParser.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorColumnParser.java
new file mode 100644
index 000000000000..68724be54cb6
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorColumnParser.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.LinkedHashMap;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * Parses the {@code hoodie.vector.columns} table option and converts the 
configured top-level
+ * Flink array fields into Hoodie VECTOR schema fields.
+ *
+ * <p>The option uses {@code colName[:dimension]} entries separated by commas, 
for example
+ * {@code embedding:4,features:3,codes:4}. The dimension defaults to {@value 
#DEFAULT_VECTOR_DIMENSION}
+ * when omitted. Column names are matched case-insensitively. The vector 
element type is inferred
+ * from the Flink array element type: {@code ARRAY<FLOAT>}, {@code 
ARRAY<DOUBLE>}, or
+ * {@code ARRAY<TINYINT>} map to FLOAT, DOUBLE, and INT8 respectively.
+ *
+ * <p>The parser validates the descriptor syntax (well-formed entries, no 
duplicate columns,
+ * positive dimensions); existence of the referenced columns in the table 
schema is validated by
+ * {@link HoodieSchemaConverter} during schema conversion.
+ */
+public class VectorColumnParser {
+
+  public static final int DEFAULT_VECTOR_DIMENSION = 128;
+
+  private VectorColumnParser() {
+  }
+
+  /**
+   * Parses the {@code hoodie.vector.columns} descriptor string into a map 
from the
+   * (lower-cased) column name to its vector dimension, preserving declaration 
order.
+   *
+   * @param vectorColumns comma-separated {@code colName[:dimension]} 
descriptors
+   * @return map from normalized column name to dimension
+   * @throws IllegalArgumentException if a descriptor is malformed, 
duplicated, or has a non-positive dimension
+   */
+  public static Map<String, Integer> parse(String vectorColumns) {
+    Map<String, Integer> parsed = new LinkedHashMap<>();
+    for (String rawEntry : vectorColumns.split(",")) {
+      String entry = rawEntry.trim();
+      if (entry.isEmpty()) {
+        continue;
+      }
+      String[] parts = entry.split(":", -1);
+      if (parts.length > 2 || parts[0].trim().isEmpty()) {
+        throw new IllegalArgumentException(
+            "Invalid VECTOR column descriptor '" + entry + "'. Expected 
format: columnName[:dimension].");
+      }
+      String columnName = parts[0].trim();
+      String normalizedColumnName = columnName.toLowerCase(Locale.ROOT);
+      if (parsed.containsKey(normalizedColumnName)) {
+        throw new IllegalArgumentException("Duplicate VECTOR column descriptor 
for column: " + columnName);
+      }
+      int dimension = DEFAULT_VECTOR_DIMENSION;
+      if (parts.length == 2) {
+        String dimensionText = parts[1].trim();
+        if (dimensionText.isEmpty()) {
+          throw new IllegalArgumentException(
+              "Invalid VECTOR column descriptor '" + entry + "'. Dimension 
must not be empty.");
+        }
+        try {
+          dimension = Integer.parseInt(dimensionText);
+        } catch (NumberFormatException e) {
+          throw new IllegalArgumentException("Invalid VECTOR dimension for 
column '" + columnName + "': " + dimensionText, e);
+        }
+      }
+      if (dimension <= 0) {
+        throw new IllegalArgumentException("VECTOR dimension must be positive 
for column '" + columnName + "': " + dimension);
+      }
+      parsed.put(normalizedColumnName, dimension);
+    }
+    return parsed;
+  }
+
+  /**
+   * Converts a Flink array field into a Hoodie VECTOR schema field with the 
given dimension.
+   *
+   * @param fieldName the column name (for error messages)
+   * @param fieldType the Flink field type, which must be an {@link ArrayType}
+   * @param dimension the configured vector dimension
+   * @return the corresponding VECTOR {@link HoodieSchema}, wrapped as 
nullable when the field is nullable
+   */
+  public static HoodieSchema convertVectorField(String fieldName, LogicalType 
fieldType, int dimension) {
+    if (!(fieldType instanceof ArrayType)) {
+      throw new IllegalArgumentException(
+          "VECTOR column '" + fieldName + "' must be declared as ARRAY<FLOAT>, 
ARRAY<DOUBLE>, or ARRAY<TINYINT>, but got: "
+              + fieldType.asSummaryString());
+    }
+    HoodieSchema.Vector.VectorElementType elementType = 
inferVectorElementType(fieldName, ((ArrayType) fieldType).getElementType());
+    HoodieSchema vectorSchema = HoodieSchema.createVector(dimension, 
elementType);
+    return fieldType.isNullable() ? HoodieSchema.createNullable(vectorSchema) 
: vectorSchema;
+  }
+
+  private static HoodieSchema.Vector.VectorElementType 
inferVectorElementType(String fieldName, LogicalType elementType) {
+    switch (elementType.getTypeRoot()) {
+      case FLOAT:
+        return HoodieSchema.Vector.VectorElementType.FLOAT;
+      case DOUBLE:
+        return HoodieSchema.Vector.VectorElementType.DOUBLE;
+      case TINYINT:
+        return HoodieSchema.Vector.VectorElementType.INT8;
+      default:
+        throw new IllegalArgumentException(
+            "VECTOR column '" + fieldName + "' must use ARRAY<FLOAT>, 
ARRAY<DOUBLE>, or ARRAY<TINYINT>, but got ARRAY<"
+                + elementType.asSummaryString() + ">.");
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorConversionUtils.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorConversionUtils.java
index 402d72372931..c3721bae71ea 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorConversionUtils.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorConversionUtils.java
@@ -21,11 +21,13 @@ package org.apache.hudi.util;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.util.HoodieVectorUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.exception.SchemaCompatibilityException;
 
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -35,6 +37,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
@@ -273,6 +276,38 @@ public final class VectorConversionUtils {
     throw new UnsupportedOperationException("Unsupported decoded vector array 
type: " + vectorArray.getClass());
   }
 
+  /**
+   * Encodes Flink array data into the canonical Hudi VECTOR fixed-bytes 
representation.
+   */
+  public static byte[] encodeVectorArrayData(ArrayData arrayData, 
HoodieSchema.Vector vectorSchema) {
+    int dimension = vectorSchema.getDimension();
+    HoodieSchema.Vector.VectorElementType elementType = 
vectorSchema.getVectorElementType();
+    ValidationUtils.checkArgument(arrayData.size() == dimension,
+        () -> "Vector dimension mismatch: schema expects " + dimension + " 
elements but got " + arrayData.size());
+    int bufferSize = Math.multiplyExact(dimension, 
elementType.getElementSize());
+    ByteBuffer buffer = 
ByteBuffer.allocate(bufferSize).order(HoodieSchema.VectorLogicalType.VECTOR_BYTE_ORDER);
+    switch (elementType) {
+      case FLOAT:
+        for (int i = 0; i < dimension; i++) {
+          buffer.putFloat(arrayData.getFloat(i));
+        }
+        break;
+      case DOUBLE:
+        for (int i = 0; i < dimension; i++) {
+          buffer.putDouble(arrayData.getDouble(i));
+        }
+        break;
+      case INT8:
+        for (int i = 0; i < dimension; i++) {
+          buffer.put(arrayData.getByte(i));
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported VECTOR element 
type: " + elementType);
+    }
+    return buffer.array();
+  }
+
   /**
    * Validates that a Flink ARRAY logical type uses the element type required 
by the VECTOR schema.
    *
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataCreateHandle.java
index ecd3ab67d4a1..b8d29dfb2c34 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataCreateHandle.java
@@ -20,24 +20,22 @@ package org.apache.hudi.io.storage.row;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodiePayloadProps;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.testutils.HoodieFlinkClientTestHarness;
+import org.apache.hudi.util.HoodieSchemaConverter;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Properties;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -69,27 +67,6 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
     cleanupResources();
   }
 
-  /**
-   * Adds Hoodie metadata fields to the row type, matching production behavior 
in BulkInsertWriterHelper.
-   */
-  private static RowType addMetadataFields(RowType rowType, boolean 
withOperationField) {
-    List<RowType.RowField> mergedFields = new ArrayList<>();
-
-    LogicalType metadataFieldType = DataTypes.STRING().getLogicalType();
-    mergedFields.add(new 
RowType.RowField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, metadataFieldType));
-    mergedFields.add(new 
RowType.RowField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, metadataFieldType));
-    mergedFields.add(new 
RowType.RowField(HoodieRecord.RECORD_KEY_METADATA_FIELD, metadataFieldType));
-    mergedFields.add(new 
RowType.RowField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
metadataFieldType));
-    mergedFields.add(new 
RowType.RowField(HoodieRecord.FILENAME_METADATA_FIELD, metadataFieldType));
-
-    if (withOperationField) {
-      mergedFields.add(new 
RowType.RowField(HoodieRecord.OPERATION_METADATA_FIELD, metadataFieldType));
-    }
-
-    mergedFields.addAll(rowType.getFields());
-    return new RowType(mergedFields);
-  }
-
   @Test
   public void testEventTimeFieldIndexWithDoubleType() throws Exception {
     // Schema with DOUBLE event_time field
@@ -101,8 +78,6 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
         DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
     ).notNull();
     RowType baseRowType = (RowType) dataType.getLogicalType();
-    // Add metadata fields like production code does
-    RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
 
     Properties props = new Properties();
     props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, 
"event_time");
@@ -110,13 +85,15 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
         .withPath(basePath)
         .withProperties(props)
         .withEmbeddedTimelineServerEnabled(false)
+        
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
         .build();
 
     HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context, 
metaClient);
 
     HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
         table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
-        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false, 
false);
+        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+        
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
 false), false, false);
 
     assertNotNull(handle);
     // Verify the handle was created successfully with event time configured
@@ -133,7 +110,6 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
         DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
     ).notNull();
     RowType baseRowType = (RowType) dataType.getLogicalType();
-    RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
 
     Properties props = new Properties();
     props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, 
"event_time");
@@ -141,6 +117,7 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
         .withPath(basePath)
         .withProperties(props)
         .withEmbeddedTimelineServerEnabled(false)
+        
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
         .build();
 
     HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context, 
metaClient);
@@ -148,7 +125,8 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
     // Should create handle but log warning about unsupported type
     HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
         table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
-        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false, 
false);
+        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+        
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
 false), false, false);
 
     assertNotNull(handle);
   }
@@ -163,7 +141,6 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
         DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
     ).notNull();
     RowType baseRowType = (RowType) dataType.getLogicalType();
-    RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
 
     Properties props = new Properties();
     props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, 
"event_time");
@@ -171,6 +148,7 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
         .withPath(basePath)
         .withProperties(props)
         .withEmbeddedTimelineServerEnabled(false)
+        
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
         .build();
 
     HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context, 
metaClient);
@@ -178,7 +156,8 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
     // Should create handle but log warning about missing field
     HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
         table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
-        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false, 
false);
+        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+        
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
 false), false, false);
 
     assertNotNull(handle);
   }
@@ -194,11 +173,11 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
         DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
     ).notNull();
     RowType baseRowType = (RowType) dataType.getLogicalType();
-    RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
 
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
         .withPath(basePath)
         .withEmbeddedTimelineServerEnabled(false)
+        
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
         // No event time field configured
         .build();
 
@@ -206,7 +185,8 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
 
     HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
         table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
-        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false, 
false);
+        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+        
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
 false), false, false);
 
     assertNotNull(handle);
   }
@@ -222,13 +202,13 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
         DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
     ).notNull();
     RowType baseRowType = (RowType) dataType.getLogicalType();
-    RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
 
     Properties props = new Properties();
     props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, 
"event_time");
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
         .withPath(basePath)
         .withProperties(props)
+        
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
         .withEmbeddedTimelineServerEnabled(false)
         .build();
 
@@ -236,7 +216,8 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
 
     HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
         table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
-        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false, 
false);
+        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+        
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
 false), false, false);
 
     // Create test records with event time
     double eventTimeSeconds1 = 1729512000.0; // 2024-10-21 12:00:00
@@ -293,7 +274,6 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
         DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
     ).notNull();
     RowType baseRowType = (RowType) dataType.getLogicalType();
-    RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
 
     Properties props = new Properties();
     props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, 
"event_time");
@@ -301,13 +281,15 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
         .withPath(basePath)
         .withProperties(props)
         .withEmbeddedTimelineServerEnabled(false)
+        
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
         .build();
 
     HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context, 
metaClient);
 
     HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
         table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
-        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false, 
false);
+        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+        
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
 false), false, false);
 
     long eventTimeMillis1 = 1729512000000L;
     long eventTimeMillis2 = 1729515600000L;
@@ -350,7 +332,6 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
         DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
     ).notNull();
     RowType baseRowType = (RowType) dataType.getLogicalType();
-    RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
 
     Properties props = new Properties();
     props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, 
"event_time");
@@ -358,13 +339,15 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
         .withPath(basePath)
         .withProperties(props)
         .withEmbeddedTimelineServerEnabled(false)
+        
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
         .build();
 
     HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context, 
metaClient);
 
     HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
         table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
-        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false, 
false);
+        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+        
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
 false), false, false);
 
     // Create record with null event time
     GenericRowData row = new GenericRowData(5);
@@ -400,11 +383,11 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
         DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
     ).notNull();
     RowType baseRowType = (RowType) dataType.getLogicalType();
-    RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
 
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
         .withPath(basePath)
         .withEmbeddedTimelineServerEnabled(false)
+        
.withSchema(HoodieSchemaConverter.convertToSchema(baseRowType).getAvroSchema().toString())
         // No event time field configured
         .build();
 
@@ -412,7 +395,8 @@ public class TestHoodieRowDataCreateHandle extends 
HoodieFlinkClientTestHarness
 
     HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
         table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
-        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false, 
false);
+        TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID,
+        
HoodieSchemaUtils.addMetadataFields(HoodieSchemaConverter.convertToSchema(baseRowType),
 false), false, false);
 
     // Create record with timestamp
     GenericRowData row = new GenericRowData(5);
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataParquetConfigInjector.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataParquetConfigInjector.java
index 67bf8e09cb6d..37ee8b2a59ed 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataParquetConfigInjector.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataParquetConfigInjector.java
@@ -22,11 +22,15 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.engine.LocalTaskContextSupplier;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.testutils.DisableDictionaryInjector;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.HoodieParquetConfigInjector;
 import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
@@ -35,6 +39,7 @@ import org.apache.hudi.testutils.HoodieFlinkClientTestHarness;
 import org.apache.hudi.util.HoodieSchemaConverter;
 
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.DataType;
@@ -46,13 +51,17 @@ import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.PrimitiveType;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.util.Arrays;
 
+import static org.apache.hudi.common.model.HoodieRecord.HoodieRecordType.FLINK;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -221,4 +230,45 @@ public class TestHoodieRowDataParquetConfigInjector 
extends HoodieFlinkClientTes
     // Verify the parquet file was created
     assertTrue(storage.exists(parquetPath));
   }
+
+  @Test
+  public void testGetFileWriterPropagatesHoodieSchemaForVectorColumns() throws 
Exception {
+    final String instantTime = "104";
+    HoodieStorage storage = HoodieTestUtils.getStorage(tmpDir.toString());
+    final StoragePath parquetPath = new StoragePath(
+        basePath + "/partition/path/test_vector_schema_" + instantTime + 
".parquet");
+
+    HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+        "vector_record",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.INT)),
+            HoodieSchemaField.of("embedding", HoodieSchema.createVector(2))));
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .build();
+
+    HoodieFileWriter writer = HoodieFileWriterFactory.getFileWriter(
+        instantTime, parquetPath, storage, config, hoodieSchema, new 
LocalTaskContextSupplier(), FLINK);
+    assertTrue(writer instanceof HoodieRowDataParquetWriter);
+
+    GenericRowData row = new GenericRowData(2);
+    row.setField(0, 1);
+    row.setField(1, new GenericArrayData(new float[] {1.0f, 2.0f}));
+    ((HoodieRowDataParquetWriter) writer).write(row);
+    writer.close();
+
+    Configuration hadoopConf = new Configuration();
+    Path hadoopPath = new Path(parquetPath.toUri());
+    ParquetFileReader reader = ParquetFileReader.open(hadoopConf, hadoopPath);
+    ParquetMetadata metadata = reader.getFooter();
+    reader.close();
+
+    PrimitiveType embeddingType = 
metadata.getFileMetaData().getSchema().getType("embedding").asPrimitiveType();
+    assertEquals(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
embeddingType.getPrimitiveTypeName());
+    assertEquals(8, embeddingType.getTypeLength());
+    assertEquals("embedding:VECTOR(2)",
+        
metadata.getFileMetaData().getKeyValueMetaData().get(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY));
+  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
index 403b84068496..66c6f9e86263 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -20,7 +20,10 @@ package org.apache.hudi.io.storage.row.parquet;
 
 import org.apache.hudi.adapter.DataTypeAdapter;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.storage.row.HoodieRowDataParquetWriteSupport;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.DataType;
@@ -38,6 +41,7 @@ import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
@@ -48,6 +52,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -84,7 +89,7 @@ public class TestParquetSchemaConverter {
           new MapType(
               new VarCharType(VarCharType.MAX_LENGTH),
               new VarCharType(VarCharType.MAX_LENGTH)),
-          new MapType(new IntType(), new BooleanType()),
+          new MapType(new VarCharType(VarCharType.MAX_LENGTH), new 
BooleanType()),
           RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()));
 
   private static final RowType NESTED_ARRAY_MAP_TYPE =
@@ -121,7 +126,9 @@ public class TestParquetSchemaConverter {
   void testParquetFlinkTypeConverting() {
     MessageType messageType = 
ParquetSchemaConverter.convertToParquetMessageType("flink_schema", ROW_TYPE);
     RowType rowType = ParquetSchemaConverter.convertToRowType(messageType);
-    assertThat(rowType, is(ROW_TYPE));
+    // The parquet schema is derived from the avro-based HoodieSchema, which 
has no byte/short
+    // types, so TINYINT/SMALLINT (including as array elements) are normalized 
to INT on the round trip.
+    assertThat(rowType, is((RowType) normalizeByteShortToInt(ROW_TYPE)));
 
     messageType = 
ParquetSchemaConverter.convertToParquetMessageType("flink_schema", 
NESTED_ARRAY_MAP_TYPE);
     rowType = ParquetSchemaConverter.convertToRowType(messageType);
@@ -134,7 +141,7 @@ public class TestParquetSchemaConverter {
         DataTypes.FIELD("f_array",
             DataTypes.ARRAY(DataTypes.CHAR(10).notNull())),
         DataTypes.FIELD("f_map",
-            DataTypes.MAP(DataTypes.INT(), DataTypes.VARCHAR(20))),
+            DataTypes.MAP(DataTypes.STRING(), DataTypes.VARCHAR(20))),
         DataTypes.FIELD("f_row",
             DataTypes.ROW(
                 DataTypes.FIELD("f_row_f0", DataTypes.INT()),
@@ -154,7 +161,7 @@ public class TestParquetSchemaConverter {
         + "  }\n"
         + "  optional group f_map (MAP) {\n"
         + "    repeated group key_value {\n"
-        + "      required int32 key;\n"
+        + "      required binary key (STRING);\n"
         + "      optional binary value (STRING);\n"
         + "    }\n"
         + "  }\n"
@@ -179,11 +186,12 @@ public class TestParquetSchemaConverter {
                 DataTypes.FIELD("f_array_f1", DataTypes.VARCHAR(10).notNull()),
                 DataTypes.FIELD("f_array_f3", 
DataTypes.ARRAY(DataTypes.CHAR(10).notNull()))).notNull())),
         DataTypes.FIELD("f_map",
-            DataTypes.MAP(DataTypes.INT(), DataTypes.ROW(
+            DataTypes.MAP(DataTypes.STRING(), DataTypes.ROW(
                 DataTypes.FIELD("f_map_f0", DataTypes.INT()),
                 DataTypes.FIELD("f_map_f1", 
DataTypes.VARCHAR(10))).notNull())));
 
-    org.apache.parquet.schema.MessageType messageType = 
ParquetSchemaConverter.convertToParquetMessageType("converted", (RowType) 
dataType.getLogicalType());
+    org.apache.parquet.schema.MessageType messageType =
+        ParquetSchemaConverter.convertToParquetMessageType("converted", 
(RowType) dataType.getLogicalType());
 
     assertThat(messageType.getColumns().size(), is(6));
     final String expected = "message converted {\n"
@@ -202,7 +210,7 @@ public class TestParquetSchemaConverter {
         + "  }\n"
         + "  optional group f_map (MAP) {\n"
         + "    repeated group key_value {\n"
-        + "      required int32 key;\n"
+        + "      required binary key (STRING);\n"
         + "      required group value {\n"
         + "        optional int32 f_map_f0;\n"
         + "        optional binary f_map_f1 (STRING);\n"
@@ -213,19 +221,54 @@ public class TestParquetSchemaConverter {
     assertThat(messageType.toString(), is(expected));
   }
 
+  @Test
+  void testConvertVectorColumnsWithHoodieSchema() {
+    HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+        "vector_record",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.INT)),
+            HoodieSchemaField.of("embedding", HoodieSchema.createVector(2)),
+            HoodieSchemaField.of("features", HoodieSchema.createVector(2, 
HoodieSchema.Vector.VectorElementType.DOUBLE))));
+    MessageType messageType = 
ParquetSchemaConverter.convertToParquetMessageType("converted", hoodieSchema);
+
+    PrimitiveType embeddingType = 
messageType.getType("embedding").asPrimitiveType();
+    assertEquals(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
embeddingType.getPrimitiveTypeName());
+    assertEquals(8, embeddingType.getTypeLength());
+    PrimitiveType featuresType = 
messageType.getType("features").asPrimitiveType();
+    assertEquals(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
featuresType.getPrimitiveTypeName());
+    assertEquals(16, featuresType.getTypeLength());
+  }
+
+  @Test
+  void testVectorFooterMetadataComesFromHoodieSchema() {
+    HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+        "vector_record",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.INT)),
+            HoodieSchemaField.of("embedding", HoodieSchema.createVector(2))));
+
+    HoodieRowDataParquetWriteSupport writeSupport =
+        new HoodieRowDataParquetWriteSupport(new Configuration(), 
hoodieSchema, null);
+    Map<String, String> metadata = 
writeSupport.finalizeWrite().getExtraMetaData();
+
+    assertEquals("embedding:VECTOR(2)", 
metadata.get(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY));
+  }
+
   @Test
   void testConvertTimestampTypes() {
     DataType dataType = DataTypes.ROW(
         DataTypes.FIELD("ts_3", DataTypes.TIMESTAMP(3)),
-        DataTypes.FIELD("ts_6", DataTypes.TIMESTAMP(6)),
-        DataTypes.FIELD("ts_9", DataTypes.TIMESTAMP(9)));
+        DataTypes.FIELD("ts_6", DataTypes.TIMESTAMP(6)));
     org.apache.parquet.schema.MessageType messageType =
         ParquetSchemaConverter.convertToParquetMessageType("converted", 
(RowType) dataType.getLogicalType());
-    assertThat(messageType.getColumns().size(), is(3));
+    assertThat(messageType.getColumns().size(), is(2));
     final String expected = "message converted {\n"
         + "  optional int64 ts_3 (TIMESTAMP(MILLIS,true));\n"
         + "  optional int64 ts_6 (TIMESTAMP(MICROS,true));\n"
-        + "  optional int96 ts_9;\n"
         + "}\n";
     assertThat(messageType.toString(), is(expected));
   }
@@ -364,4 +407,24 @@ public class TestParquetSchemaConverter {
         "Error message should mention VARIANT");
   }
 
+  /**
+   * Replaces TINYINT/SMALLINT with INT recursively, mirroring the lossy 
conversion that happens
+   * when a parquet schema is built from the avro-based {@link HoodieSchema}.
+   * The problem is tracked here: <a 
href="https://github.com/apache/hudi/issues/18974";>Issue#18974</a>.
+   */
+  private static LogicalType normalizeByteShortToInt(LogicalType type) {
+    if (type instanceof TinyIntType || type instanceof SmallIntType) {
+      return new IntType();
+    } else if (type instanceof ArrayType) {
+      return new ArrayType(normalizeByteShortToInt(((ArrayType) 
type).getElementType()));
+    } else if (type instanceof RowType) {
+      RowType row = (RowType) type;
+      LogicalType[] children = new LogicalType[row.getFieldCount()];
+      for (int i = 0; i < children.length; i++) {
+        children[i] = normalizeByteShortToInt(row.getTypeAt(i));
+      }
+      return RowType.of(children);
+    }
+    return type;
+  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
index 25f05eda627c..22a7c711ea4e 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -524,6 +524,63 @@ public class TestHoodieSchemaConverter {
     assertFalse(rowType.getTypeAt(1).isNullable());
   }
 
+  @Test
+  public void testConvertVectorColumnsFromFlinkSchema() {
+    RowType rowType = (RowType) DataTypes.ROW(
+        DataTypes.FIELD("id", DataTypes.STRING().notNull()),
+        DataTypes.FIELD("embedding", 
DataTypes.ARRAY(DataTypes.FLOAT().notNull()).notNull()),
+        DataTypes.FIELD("features", 
DataTypes.ARRAY(DataTypes.DOUBLE().notNull()).nullable()),
+        DataTypes.FIELD("codes", 
DataTypes.ARRAY(DataTypes.TINYINT().notNull()).notNull()))
+        .notNull()
+        .getLogicalType();
+
+    HoodieSchema schema = HoodieSchemaConverter.convertToSchema(rowType, 
"test_record", " embedding , features:64,codes:32 ");
+
+    HoodieSchema embedding = 
schema.getField("embedding").get().schema().getNonNullType();
+    assertEquals(HoodieSchemaType.VECTOR, embedding.getType());
+    assertEquals(128, ((HoodieSchema.Vector) embedding).getDimension());
+    assertEquals(HoodieSchema.Vector.VectorElementType.FLOAT,
+        ((HoodieSchema.Vector) embedding).getVectorElementType());
+
+    HoodieSchema features = schema.getField("features").get().schema();
+    assertTrue(features.isNullable());
+    HoodieSchema featuresVector = features.getNonNullType();
+    assertEquals(64, ((HoodieSchema.Vector) featuresVector).getDimension());
+    assertEquals(HoodieSchema.Vector.VectorElementType.DOUBLE,
+        ((HoodieSchema.Vector) featuresVector).getVectorElementType());
+
+    HoodieSchema codes = 
schema.getField("codes").get().schema().getNonNullType();
+    assertEquals(32, ((HoodieSchema.Vector) codes).getDimension());
+    assertEquals(HoodieSchema.Vector.VectorElementType.INT8,
+        ((HoodieSchema.Vector) codes).getVectorElementType());
+
+    HoodieSchema defaultRecordSchema = 
HoodieSchemaConverter.convertToSchema(rowType, "record", "embedding:2");
+    assertEquals("record", defaultRecordSchema.getName());
+    HoodieSchema defaultEmbedding = 
defaultRecordSchema.getField("embedding").get().schema().getNonNullType();
+    assertEquals(2, ((HoodieSchema.Vector) defaultEmbedding).getDimension());
+  }
+
+  @Test
+  public void testConvertVectorColumnsValidation() {
+    RowType rowType = (RowType) DataTypes.ROW(
+        DataTypes.FIELD("embedding", 
DataTypes.ARRAY(DataTypes.FLOAT().notNull()).notNull()),
+        DataTypes.FIELD("tags", 
DataTypes.ARRAY(DataTypes.STRING().notNull()).notNull()),
+        DataTypes.FIELD("plain", DataTypes.STRING()))
+        .notNull()
+        .getLogicalType();
+
+    assertThrows(IllegalArgumentException.class,
+        () -> HoodieSchemaConverter.convertToSchema(rowType, "test_record", 
"embedding:0"));
+    assertThrows(IllegalArgumentException.class,
+        () -> HoodieSchemaConverter.convertToSchema(rowType, "test_record", 
"embedding,embedding:64"));
+    assertThrows(IllegalArgumentException.class,
+        () -> HoodieSchemaConverter.convertToSchema(rowType, "test_record", 
"missing"));
+    assertThrows(IllegalArgumentException.class,
+        () -> HoodieSchemaConverter.convertToSchema(rowType, "test_record", 
"plain"));
+    assertThrows(IllegalArgumentException.class,
+        () -> HoodieSchemaConverter.convertToSchema(rowType, "test_record", 
"tags"));
+  }
+
   private void assertVectorArray(DataType dataType, LogicalTypeRoot 
elementTypeRoot, boolean nullable) {
     ArrayType arrayType = assertInstanceOf(ArrayType.class, 
dataType.getLogicalType());
     assertEquals(elementTypeRoot, arrayType.getElementType().getTypeRoot());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
index 98bd1df034dc..a191842b6c5b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
@@ -73,6 +73,19 @@ public final class HoodieSchemaUtils {
     throw new UnsupportedOperationException("Utility class cannot be 
instantiated");
   }
 
+  /**
+   * Returns the schema for the specified field.
+   *
+   * @param schema    record schema that contains the field
+   * @param fieldName field name to resolve
+   * @return schema of the resolved field
+   * @throws HoodieSchemaException if the field does not exist in the schema
+   */
+  public static HoodieSchema getFieldSchema(HoodieSchema schema, String 
fieldName) {
+    return 
schema.getNonNullType().getField(fieldName).map(HoodieSchemaField::schema)
+        .orElseThrow(() -> new HoodieSchemaException("Field " + fieldName + " 
doesn't exist in schema: " + schema));
+  }
+
   /**
    * Creates a write schema for Hudi operations, adding necessary metadata 
fields.
    *
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 357fa51b603b..2a4c5f8815fd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -374,6 +374,14 @@ public class FlinkOptions extends HoodieConfig {
       .noDefaultValue()
       .withDescription("Source avro schema string, the parsed schema is used 
for deserialization");
 
+  public static final ConfigOption<String> VECTOR_COLUMNS = ConfigOptions
+      .key("hoodie.vector.columns")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Comma-separated top-level VECTOR columns in the format 
columnName[:dimension]. "
+          + "The dimension defaults to 128 when omitted. The element type is 
inferred from the Flink "
+          + "ARRAY element type: FLOAT, DOUBLE, or TINYINT.");
+
   public static final String QUERY_TYPE_SNAPSHOT = "snapshot";
   public static final String QUERY_TYPE_READ_OPTIMIZED = "read_optimized";
   public static final String QUERY_TYPE_INCREMENTAL = "incremental";
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index 5c324235251c..a6eee1caf517 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -83,7 +83,7 @@ public class BucketBulkInsertWriterHelper extends 
BulkInsertWriterHelper {
         close();
       }
       HoodieRowDataCreateHandle rowCreateHandle = new 
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
-          instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata, isAppendMode && !populateMetaFields);
+          instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, 
writerSchema, preserveHoodieMetadata, isAppendMode && !populateMetaFields);
       handles.put(fileId, rowCreateHandle);
     }
     return handles.get(fileId);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 8cb5ef9fdeb4..14e7ea59191c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -20,6 +20,8 @@ package org.apache.hudi.sink.bulk;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -29,7 +31,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
 import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -66,7 +68,8 @@ public class BulkInsertWriterHelper {
   protected final long taskEpochId;
   protected final HoodieTable hoodieTable;
   protected final HoodieWriteConfig writeConfig;
-  protected final RowType rowType;
+  // table schema handed to the write handles.
+  protected final HoodieSchema writerSchema;
   protected final boolean preserveHoodieMetadata;
   protected final boolean isAppendMode;
   // used for Append mode only, if true then only initial row data without 
metacolumns is written
@@ -104,9 +107,13 @@ public class BulkInsertWriterHelper {
     this.taskEpochId = taskEpochId;
     this.isAppendMode = OptionsResolver.isAppendMode(conf);
     this.populateMetaFields = writeConfig.populateMetaFields();
-    this.rowType = preserveHoodieMetadata || (isAppendMode && 
!populateMetaFields)
-        ? rowType
-        : DataTypeUtils.addMetadataFields(rowType, 
writeConfig.allowOperationMetadataField());
+    HoodieSchema schema = HoodieSchemaConverter.convertToSchema(
+        rowType,
+        
HoodieSchemaUtils.getRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME)),
+        conf.get(FlinkOptions.VECTOR_COLUMNS));
+    this.writerSchema = preserveHoodieMetadata || (isAppendMode && 
!populateMetaFields)
+        ? schema
+        : HoodieSchemaUtils.addMetadataFields(schema, 
writeConfig.allowOperationMetadataField());
     this.preserveHoodieMetadata = preserveHoodieMetadata;
     this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) && 
conf.get(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
     this.fileIdPrefix = UUID.randomUUID().toString();
@@ -147,7 +154,7 @@ public class BulkInsertWriterHelper {
       log.info("Creating new file for partition path " + partitionPath);
       writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
       HoodieRowDataCreateHandle rowCreateHandle = new 
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, 
getNextFileId(),
-          instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata, isAppendMode && !populateMetaFields);
+          instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, 
writerSchema, preserveHoodieMetadata, isAppendMode && !populateMetaFields);
       handles.put(partitionPath, rowCreateHandle);
 
       writeMetrics.ifPresent(FlinkStreamWriteMetrics::increaseNumOfOpenHandle);
@@ -208,7 +215,7 @@ public class BulkInsertWriterHelper {
   private HoodieRowDataCreateHandle createWriteHandle(String  partitionPath) {
     writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
     HoodieRowDataCreateHandle rowCreateHandle = new 
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, 
getNextFileId(),
-        instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata, isAppendMode && !populateMetaFields);
+        instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, 
writerSchema, preserveHoodieMetadata, isAppendMode && !populateMetaFields);
     writeMetrics.ifPresent(FlinkStreamWriteMetrics::endHandleCreation);
     return rowCreateHandle;
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 2fb4031cd7b4..12a0b1092588 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -520,7 +520,10 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   private static void inferAvroSchema(Configuration conf, LogicalType rowType) 
{
     if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isEmpty()
         && conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isEmpty()) {
-      String inferredSchema = HoodieSchemaConverter.convertToSchema(rowType, 
HoodieSchemaUtils.getRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME))).toString();
+      String inferredSchema = HoodieSchemaConverter.convertToSchema(
+          rowType,
+          
HoodieSchemaUtils.getRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME)),
+          conf.get(FlinkOptions.VECTOR_COLUMNS)).toString();
       conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, inferredSchema);
     }
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 9dd79cde9f2d..76d4bae29a88 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -685,7 +685,10 @@ public class HoodieTableSource extends FileIndexReader 
implements
   }
 
   private HoodieSchema inferSchemaFromDdl() {
-    HoodieSchema schema = 
HoodieSchemaConverter.convertToSchema(this.tableRowType);
+    HoodieSchema schema = HoodieSchemaConverter.convertToSchema(
+        this.tableRowType,
+        
HoodieSchemaUtils.getRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME)),
+        conf.get(FlinkOptions.VECTOR_COLUMNS));
     return HoodieSchemaUtils.addMetadataFields(schema, 
conf.get(FlinkOptions.CHANGELOG_ENABLED));
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index a8bf340f64e2..e1eca3ded180 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -70,6 +70,7 @@ import 
org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -319,9 +320,11 @@ public class HoodieCatalog extends AbstractCatalog {
         && !OptionsResolver.isAppendMode(conf)) {
       throw new CatalogException("Primary key definition is missing");
     }
+    LogicalType rowType = 
resolvedSchema.toPhysicalRowDataType().getLogicalType();
     final String avroSchema = HoodieSchemaConverter.convertToSchema(
-        resolvedSchema.toPhysicalRowDataType().getLogicalType(),
-        
HoodieSchemaUtils.getRecordQualifiedName(tablePath.getObjectName())).toString();
+        rowType,
+        HoodieSchemaUtils.getRecordQualifiedName(tablePath.getObjectName()),
+        conf.get(FlinkOptions.VECTOR_COLUMNS)).toString();
     conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
 
     // stores two copies of options:
@@ -619,9 +622,11 @@ public class HoodieCatalog extends AbstractCatalog {
   private void refreshTableProperties(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable) {
     Map<String, String> options = newCatalogTable.getOptions();
     ResolvedCatalogTable resolvedTable =  (ResolvedCatalogTable) 
newCatalogTable;
+    LogicalType rowType = 
resolvedTable.getResolvedSchema().toPhysicalRowDataType().getLogicalType();
     final String avroSchema = HoodieSchemaConverter.convertToSchema(
-        
resolvedTable.getResolvedSchema().toPhysicalRowDataType().getLogicalType(),
-        
HoodieSchemaUtils.getRecordQualifiedName(tablePath.getObjectName())).toString();
+        rowType,
+        HoodieSchemaUtils.getRecordQualifiedName(tablePath.getObjectName()),
+        options.get(FlinkOptions.VECTOR_COLUMNS.key())).toString();
     options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), avroSchema);
     java.util.Optional<UniqueConstraint> pkConstraintOpt = 
resolvedTable.getResolvedSchema().getPrimaryKey();
     if (pkConstraintOpt.isPresent()) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index c42bdf281174..b34bc705bb29 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -80,6 +80,7 @@ import 
org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -494,9 +495,11 @@ public class HoodieHiveCatalog extends AbstractCatalog {
 
   private HoodieTableMetaClient initTableIfNotExists(ObjectPath tablePath, 
CatalogTable catalogTable) {
     Configuration flinkConf = Configuration.fromMap(catalogTable.getOptions());
+    RowType rowType = 
DataTypeUtils.toRowType(catalogTable.getUnresolvedSchema());
     final String avroSchema = HoodieSchemaConverter.convertToSchema(
-        DataTypeUtils.toRowType(catalogTable.getUnresolvedSchema()),
-        
HoodieSchemaUtils.getRecordQualifiedName(tablePath.getObjectName())).toString();
+        rowType,
+        HoodieSchemaUtils.getRecordQualifiedName(tablePath.getObjectName()),
+        flinkConf.get(FlinkOptions.VECTOR_COLUMNS)).toString();
     flinkConf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
 
     // stores two copies of options:
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
index b2f5d428cdaf..6b4749c2f26e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
@@ -203,7 +203,10 @@ public class TableOptionProperties {
       List<String> partitionKeys,
       boolean withOperationField) {
     RowType rowType = 
supplementMetaFields(DataTypeUtils.toRowType(catalogTable.getUnresolvedSchema()),
 withOperationField);
-    HoodieSchema schema = HoodieSchemaConverter.convertToSchema(rowType);
+    HoodieSchema schema = HoodieSchemaConverter.convertToSchema(
+        rowType,
+        "record",
+        catalogTable.getOptions().get(FlinkOptions.VECTOR_COLUMNS.key()));
     String sparkVersion = 
catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
     Map<String, String> sparkTableProperties = 
SparkDataSourceTableUtils.getSparkTableProperties(
         partitionKeys,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java
new file mode 100644
index 000000000000..1b6cda8c1013
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
+import org.apache.hudi.utils.TestTableEnvs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.lang.reflect.Array;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * End-to-end tests for Flink vector column read/write support.
+ */
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestVectorDataSource {
+
+  @TempDir
+  Path tempDir;
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  public void testVectorRoundTrip(HoodieTableType tableType) throws Exception {
+    TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+    String tablePath = tempDir.resolve("round_trip_" + 
tableType.name()).toUri().toString();
+    createVectorTable(
+        tableEnv,
+        "vector_table",
+        tablePath,
+        tableType,
+        "embedding:4,features:3,codes:4,nullable_embedding:2");
+
+    execInsertSql(tableEnv,
+        "INSERT INTO vector_table VALUES "
+            + "('id1', "
+            + " ARRAY[CAST(1.0 AS FLOAT), CAST(2.0 AS FLOAT), CAST(3.0 AS 
FLOAT), CAST(4.0 AS FLOAT)], "
+            + " ARRAY[CAST(0.1 AS DOUBLE), CAST(0.2 AS DOUBLE), CAST(0.3 AS 
DOUBLE)], "
+            + " ARRAY[CAST(1 AS TINYINT), CAST(2 AS TINYINT), CAST(3 AS 
TINYINT), CAST(4 AS TINYINT)], "
+            + " ARRAY[CAST(9.0 AS FLOAT), CAST(9.5 AS FLOAT)], "
+            + " 'label1', ARRAY['red', 'blue'], 1000), "
+            + "('id2', "
+            + " ARRAY[CAST(-1.0 AS FLOAT), CAST(-2.0 AS FLOAT), CAST(-3.0 AS 
FLOAT), CAST(-4.0 AS FLOAT)], "
+            + " ARRAY[CAST(1.1 AS DOUBLE), CAST(1.2 AS DOUBLE), CAST(1.3 AS 
DOUBLE)], "
+            + " ARRAY[CAST(-1 AS TINYINT), CAST(-2 AS TINYINT), CAST(-3 AS 
TINYINT), CAST(-4 AS TINYINT)], "
+            + " CAST(NULL AS ARRAY<FLOAT>), "
+            + " 'label2', ARRAY['green'], 2000)");
+
+    assertStoredVectorSchema(tablePath, "embedding", 4, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+    assertStoredVectorSchema(tablePath, "features", 3, 
HoodieSchema.Vector.VectorElementType.DOUBLE);
+    assertStoredVectorSchema(tablePath, "codes", 4, 
HoodieSchema.Vector.VectorElementType.INT8);
+    assertStoredVectorSchema(tablePath, "nullable_embedding", 2, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+
+    List<Row> rows = collect(tableEnv,
+        "SELECT id, embedding, features, codes, nullable_embedding, label, 
tags, ts FROM vector_table ORDER BY id");
+    assertEquals(2, rows.size());
+
+    Row first = rows.get(0);
+    assertEquals("id1", first.getField(0));
+    assertFloatArray(first.getField(1), new float[] {1.0f, 2.0f, 3.0f, 4.0f});
+    assertDoubleArray(first.getField(2), new double[] {0.1d, 0.2d, 0.3d});
+    assertByteArray(first.getField(3), new byte[] {1, 2, 3, 4});
+    assertFloatArray(first.getField(4), new float[] {9.0f, 9.5f});
+    assertEquals("label1", first.getField(5));
+    assertObjectArray(first.getField(6), new Object[] {"red", "blue"});
+    assertEquals(1000L, first.getField(7));
+
+    Row second = rows.get(1);
+    assertEquals("id2", second.getField(0));
+    assertFloatArray(second.getField(1), new float[] {-1.0f, -2.0f, -3.0f, 
-4.0f});
+    assertDoubleArray(second.getField(2), new double[] {1.1d, 1.2d, 1.3d});
+    assertByteArray(second.getField(3), new byte[] {-1, -2, -3, -4});
+    assertNull(second.getField(4));
+    assertEquals("label2", second.getField(5));
+    assertObjectArray(second.getField(6), new Object[] {"green"});
+    assertEquals(2000L, second.getField(7));
+
+    List<Row> nonVectorProjection = collect(tableEnv, "SELECT id, label, ts 
FROM vector_table ORDER BY id");
+    assertEquals(Row.of("id1", "label1", 1000L), nonVectorProjection.get(0));
+    assertEquals(Row.of("id2", "label2", 2000L), nonVectorProjection.get(1));
+
+    List<Row> vectorProjection = collect(tableEnv, "SELECT id, embedding FROM 
vector_table WHERE id = 'id2'");
+    assertEquals(1, vectorProjection.size());
+    assertFloatArray(vectorProjection.get(0).getField(1), new float[] {-1.0f, 
-2.0f, -3.0f, -4.0f});
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  public void testVectorAppendWriteRoundTrip(HoodieTableType tableType) throws 
Exception {
+    TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+    String tablePath = tempDir.resolve("append_write_" + 
tableType.name()).toUri().toString();
+    createVectorTable(tableEnv, "vector_table", tablePath, tableType, 
"embedding:3,features:2,codes:3", "insert");
+
+    execInsertSql(tableEnv,
+        "INSERT INTO vector_table(id, embedding, features, codes, label, ts) 
VALUES "
+            + "('id1', "
+            + " ARRAY[CAST(1.0 AS FLOAT), CAST(1.5 AS FLOAT), CAST(2.0 AS 
FLOAT)], "
+            + " ARRAY[CAST(0.1 AS DOUBLE), CAST(0.2 AS DOUBLE)], "
+            + " ARRAY[CAST(1 AS TINYINT), CAST(2 AS TINYINT), CAST(3 AS 
TINYINT)], "
+            + " 'append1', 1000), "
+            + "('id2', "
+            + " ARRAY[CAST(3.0 AS FLOAT), CAST(3.5 AS FLOAT), CAST(4.0 AS 
FLOAT)], "
+            + " ARRAY[CAST(1.1 AS DOUBLE), CAST(1.2 AS DOUBLE)], "
+            + " ARRAY[CAST(-1 AS TINYINT), CAST(-2 AS TINYINT), CAST(-3 AS 
TINYINT)], "
+            + " 'append2', 2000)");
+
+    assertStoredVectorSchema(tablePath, "embedding", 3, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+    assertStoredVectorSchema(tablePath, "features", 2, 
HoodieSchema.Vector.VectorElementType.DOUBLE);
+    assertStoredVectorSchema(tablePath, "codes", 3, 
HoodieSchema.Vector.VectorElementType.INT8);
+
+    List<Row> rows = collect(tableEnv, "SELECT id, embedding, features, codes, 
label, ts FROM vector_table ORDER BY id");
+    assertEquals(2, rows.size());
+
+    assertEquals("id1", rows.get(0).getField(0));
+    assertFloatArray(rows.get(0).getField(1), new float[] {1.0f, 1.5f, 2.0f});
+    assertDoubleArray(rows.get(0).getField(2), new double[] {0.1d, 0.2d});
+    assertByteArray(rows.get(0).getField(3), new byte[] {1, 2, 3});
+    assertEquals("append1", rows.get(0).getField(4));
+    assertEquals(1000L, rows.get(0).getField(5));
+
+    assertEquals("id2", rows.get(1).getField(0));
+    assertFloatArray(rows.get(1).getField(1), new float[] {3.0f, 3.5f, 4.0f});
+    assertDoubleArray(rows.get(1).getField(2), new double[] {1.1d, 1.2d});
+    assertByteArray(rows.get(1).getField(3), new byte[] {-1, -2, -3});
+    assertEquals("append2", rows.get(1).getField(4));
+    assertEquals(2000L, rows.get(1).getField(5));
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  public void testVectorUpsert(HoodieTableType tableType) throws Exception {
+    TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+    String tablePath = tempDir.resolve("upsert_" + 
tableType.name()).toUri().toString();
+    createVectorTable(tableEnv, "vector_table", tablePath, tableType, 
"embedding:2");
+
+    execInsertSql(tableEnv,
+        "INSERT INTO vector_table(id, embedding, label, ts) VALUES "
+            + "('id1', ARRAY[CAST(1.0 AS FLOAT), CAST(1.1 AS FLOAT)], 'old1', 
1), "
+            + "('id2', ARRAY[CAST(2.0 AS FLOAT), CAST(2.2 AS FLOAT)], 'old2', 
1)");
+    execInsertSql(tableEnv,
+        "INSERT INTO vector_table(id, embedding, label, ts) VALUES "
+            + "('id1', ARRAY[CAST(9.0 AS FLOAT), CAST(9.9 AS FLOAT)], 'new1', 
10), "
+            + "('id3', ARRAY[CAST(3.0 AS FLOAT), CAST(3.3 AS FLOAT)], 'new3', 
1)");
+
+    List<Row> rows = collect(tableEnv, "SELECT id, embedding, label, ts FROM 
vector_table ORDER BY id");
+    assertEquals(3, rows.size());
+
+    assertEquals("id1", rows.get(0).getField(0));
+    assertFloatArray(rows.get(0).getField(1), new float[] {9.0f, 9.9f});
+    assertEquals("new1", rows.get(0).getField(2));
+    assertEquals(10L, rows.get(0).getField(3));
+
+    assertEquals("id2", rows.get(1).getField(0));
+    assertFloatArray(rows.get(1).getField(1), new float[] {2.0f, 2.2f});
+    assertEquals("old2", rows.get(1).getField(2));
+
+    assertEquals("id3", rows.get(2).getField(0));
+    assertFloatArray(rows.get(2).getField(1), new float[] {3.0f, 3.3f});
+    assertEquals("new3", rows.get(2).getField(2));
+  }
+
+  @Test
+  public void testVectorDimensionMismatchFails() {
+    TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+    String tablePath = 
tempDir.resolve("dimension_mismatch").toUri().toString();
+    createVectorTable(tableEnv, "vector_table", tablePath, 
HoodieTableType.COPY_ON_WRITE, "embedding:3");
+
+    Exception exception = assertThrows(Exception.class, () -> 
execInsertSql(tableEnv,
+        "INSERT INTO vector_table(id, embedding, label, ts) VALUES "
+            + "('id1', ARRAY[CAST(1.0 AS FLOAT), CAST(2.0 AS FLOAT)], 'bad', 
1)"));
+    assertTrue(containsMessage(exception, "dimension mismatch"),
+        "Expected dimension mismatch in exception chain, got: " + 
exception.getMessage());
+  }
+
+  private static void createVectorTable(
+      TableEnvironment tableEnv,
+      String tableName,
+      String tablePath,
+      HoodieTableType tableType,
+      String vectorColumns) {
+    createVectorTable(tableEnv, tableName, tablePath, tableType, 
vectorColumns, null);
+  }
+
+  private static void createVectorTable(
+      TableEnvironment tableEnv,
+      String tableName,
+      String tablePath,
+      HoodieTableType tableType,
+      String vectorColumns,
+      String writeOperation) {
+    tableEnv.executeSql(
+        "CREATE TABLE " + tableName + " ("
+            + " id STRING,"
+            + " embedding ARRAY<FLOAT>,"
+            + " features ARRAY<DOUBLE>,"
+            + " codes ARRAY<TINYINT>,"
+            + " nullable_embedding ARRAY<FLOAT>,"
+            + " label STRING,"
+            + " tags ARRAY<STRING>,"
+            + " ts BIGINT,"
+            + " PRIMARY KEY (id) NOT ENFORCED"
+            + ") WITH ("
+            + " 'connector' = 'hudi',"
+            + " 'path' = '" + tablePath + "',"
+            + " '" + FlinkOptions.TABLE_TYPE.key() + "' = '" + 
tableType.name() + "',"
+            + " '" + FlinkOptions.ORDERING_FIELDS.key() + "' = 'ts',"
+            + " '" + FlinkOptions.VECTOR_COLUMNS.key() + "' = '" + 
vectorColumns + "',"
+            + " '" + FlinkOptions.METADATA_ENABLED.key() + "' = 'false',"
+            + " '" + FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key() + "' = 
'false',"
+            + " '" + FlinkOptions.COMPACTION_ASYNC_ENABLED.key() + "' = 
'false',"
+            + (writeOperation == null ? "" : " '" + 
FlinkOptions.OPERATION.key() + "' = '" + writeOperation + "',")
+            + " '" + FlinkOptions.WRITE_TASKS.key() + "' = '1',"
+            + " '" + FlinkOptions.READ_TASKS.key() + "' = '1'"
+            + ")");
+  }
+
+  private static void execInsertSql(TableEnvironment tableEnv, String insert) 
throws ExecutionException, InterruptedException {
+    TableResult tableResult = tableEnv.executeSql(insert);
+    tableResult.await();
+  }
+
+  private static List<Row> collect(TableEnvironment tableEnv, String query) {
+    return CollectionUtil.iteratorToList(tableEnv.executeSql(query).collect());
+  }
+
+  private static void assertStoredVectorSchema(
+      String tablePath,
+      String fieldName,
+      int dimension,
+      HoodieSchema.Vector.VectorElementType elementType) {
+    Configuration conf = new Configuration();
+    HoodieSchema tableSchema = StreamerUtil.getTableConfig(tablePath, 
HadoopConfigurations.getHadoopConf(conf))
+        .flatMap(config -> config.getTableCreateSchema())
+        .orElseThrow(() -> new AssertionError("Expected table create schema 
for " + tablePath));
+    Option<HoodieSchemaField> fieldOpt = tableSchema.getField(fieldName);
+    assertTrue(fieldOpt.isPresent(), "Expected field " + fieldName + " in 
table schema");
+    HoodieSchema fieldSchema = fieldOpt.get().schema().getNonNullType();
+    assertEquals(HoodieSchemaType.VECTOR, fieldSchema.getType());
+    HoodieSchema.Vector vector = (HoodieSchema.Vector) fieldSchema;
+    assertEquals(dimension, vector.getDimension());
+    assertEquals(elementType, vector.getVectorElementType());
+  }
+
+  private static int arrayLength(Object value) {
+    assertNotNull(value);
+    if (value instanceof List) {
+      return ((List<?>) value).size();
+    }
+    return Array.getLength(value);
+  }
+
+  private static Object arrayValue(Object value, int index) {
+    if (value instanceof List) {
+      return ((List<?>) value).get(index);
+    }
+    return Array.get(value, index);
+  }
+
+  private static void assertFloatArray(Object value, float[] expected) {
+    assertEquals(expected.length, arrayLength(value));
+    for (int i = 0; i < expected.length; i++) {
+      assertEquals(expected[i], ((Number) arrayValue(value, i)).floatValue(), 
0.00001f);
+    }
+  }
+
+  private static void assertDoubleArray(Object value, double[] expected) {
+    assertEquals(expected.length, arrayLength(value));
+    for (int i = 0; i < expected.length; i++) {
+      assertEquals(expected[i], ((Number) arrayValue(value, i)).doubleValue(), 
0.0000001d);
+    }
+  }
+
+  private static void assertByteArray(Object value, byte[] expected) {
+    assertEquals(expected.length, arrayLength(value));
+    for (int i = 0; i < expected.length; i++) {
+      assertEquals(expected[i], ((Number) arrayValue(value, i)).byteValue());
+    }
+  }
+
+  private static void assertObjectArray(Object value, Object[] expected) {
+    assertEquals(expected.length, arrayLength(value));
+    for (int i = 0; i < expected.length; i++) {
+      assertEquals(expected[i], arrayValue(value, i));
+    }
+  }
+
+  private static boolean containsMessage(Throwable throwable, String message) {
+    return throwable != null && ((throwable.getMessage() != null && 
throwable.getMessage().contains(message))
+        || containsMessage(throwable.getCause(), message));
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
index d7c68b7e318d..d234fb782dbe 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
@@ -32,6 +32,7 @@ import org.apache.flink.formats.json.JsonToRowDataConverters;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.DataType;
@@ -39,6 +40,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -201,4 +203,67 @@ class TestRowDataToAvroConverters {
             + (typeValue == null ? "null" : typeValue.getClass().getName()));
     Assertions.assertEquals("OUT_OF_LINE", typeValue.toString());
   }
-}
\ No newline at end of file
+
+  @Test
+  void testRowDataToAvroVectorEncoding() {
+    HoodieSchema schema = HoodieSchema.createRecord(
+        "vector_record",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.INT)),
+            HoodieSchemaField.of("embedding", HoodieSchema.createVector(2)),
+            HoodieSchemaField.of("features", HoodieSchema.createVector(2, 
HoodieSchema.Vector.VectorElementType.DOUBLE)),
+            HoodieSchemaField.of("codes", HoodieSchema.createVector(3, 
HoodieSchema.Vector.VectorElementType.INT8))));
+    RowType rowType = (RowType) 
HoodieSchemaConverter.convertToDataType(schema).getLogicalType();
+    RowDataToAvroConverters.RowDataToAvroConverter converter =
+        RowDataToAvroConverters.createConverter(rowType);
+
+    GenericRowData rowData = GenericRowData.of(
+        1,
+        new GenericArrayData(new float[] {1.0f, 2.0f}),
+        new GenericArrayData(new double[] {3.0d, 4.0d}),
+        new GenericArrayData(new byte[] {5, 6, 7}));
+    GenericRecord avroRecord = (GenericRecord) converter.convert(schema, 
rowData);
+
+    Assertions.assertArrayEquals(vectorBytes(1.0f, 2.0f),
+        ((GenericData.Fixed) avroRecord.get("embedding")).bytes());
+    Assertions.assertArrayEquals(vectorBytes(3.0d, 4.0d),
+        ((GenericData.Fixed) avroRecord.get("features")).bytes());
+    Assertions.assertArrayEquals(new byte[] {5, 6, 7},
+        ((GenericData.Fixed) avroRecord.get("codes")).bytes());
+  }
+
+  @Test
+  void testRowDataToAvroVectorValidation() {
+    HoodieSchema schema = HoodieSchema.createRecord(
+        "vector_record",
+        null,
+        null,
+        Arrays.asList(HoodieSchemaField.of("embedding", 
HoodieSchema.createVector(2))));
+    RowType rowType = (RowType) 
HoodieSchemaConverter.convertToDataType(schema).getLogicalType();
+    RowDataToAvroConverters.RowDataToAvroConverter converter =
+        RowDataToAvroConverters.createConverter(rowType);
+
+    Assertions.assertThrows(IllegalArgumentException.class,
+        () -> converter.convert(schema, GenericRowData.of(new 
GenericArrayData(new float[] {1.0f}))));
+  }
+
+  private static byte[] vectorBytes(float... values) {
+    ByteBuffer buffer = ByteBuffer.allocate(values.length * Float.BYTES)
+        .order(HoodieSchema.VectorLogicalType.VECTOR_BYTE_ORDER);
+    for (float value : values) {
+      buffer.putFloat(value);
+    }
+    return buffer.array();
+  }
+
+  private static byte[] vectorBytes(double... values) {
+    ByteBuffer buffer = ByteBuffer.allocate(values.length * Double.BYTES)
+        .order(HoodieSchema.VectorLogicalType.VECTOR_BYTE_ORDER);
+    for (double value : values) {
+      buffer.putDouble(value);
+    }
+    return buffer.array();
+  }
+}

Reply via email to