This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c1cb392df14 feat(vector): add converters from spark to hoodieSchema 
for vectors (#18190)
2c1cb392df14 is described below

commit 2c1cb392df14228037cc3318c151ee50ebd8912f
Author: Rahil C <[email protected]>
AuthorDate: Fri Mar 6 21:20:56 2026 -0800

    feat(vector): add converters from spark to hoodieSchema for vectors (#18190)
    
    * add VECTOR type to HoodieSchema
    
    * keep fixed bytes only
    
    * address elementType
    
    * fixes
    
    * add all tests
    
    * move metadata to schema level instead of as fields
    
    * fix ci
    
    * use enum for vector elemnt type
    
    * remove nesting for fixed bytes
    
    * checkstyle fixes
    
    * remove fixed requirement
    
    * address comments
    
    * fix nested strutucres test
    
    * check style
    
    * feat(vector): add converters from spark to hoodieSchema for vectors
    
    * minor fix
    
    * use generic metadata field for dimnesion property
    
    * address feedback
    
    * address ethan feedback
    
    * add type descriptor and parser for hudi_type metadata
    
    * address vinoth comments
    
    * self review, additional test coverage
    
    * address intial comments
    
    * do self review and address voon comments
    
    * minor fix
    
    * fix build after rebase
    
    * fix ci for spark 3.3 and 3.4, needed to implement the same code in the 
avro serializer and deserializers for those spark versions
    
    * minor space fix
    
    * add spark 4.0 to fix failing test
    
    * address voon indepth comments
    
    * address voon final comments
    
    * address comments again to make type descriptor api more clear
---
 .../apache/hudi/HoodieSchemaConversionUtils.scala  |   1 +
 .../sql/avro/HoodieSparkSchemaConverters.scala     |  76 ++++--
 .../apache/hudi/common/schema/HoodieSchema.java    | 129 +++++++++-
 .../hudi/common/schema/TestHoodieSchema.java       | 121 ++++++++-
 .../hudi/TestHoodieSchemaConversionUtils.scala     | 284 +++++++++++++++++++++
 .../org/apache/spark/sql/avro/TestAvroSerDe.scala  | 136 +++++++++-
 .../spark/sql/avro/TestSchemaConverters.scala      |  14 +-
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  35 +++
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  38 +++
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  35 +++
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  38 +++
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  35 +++
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  38 +++
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  35 +++
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  38 +++
 15 files changed, 1029 insertions(+), 24 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
index 86061f1cc414..4f6c8e512173 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
@@ -138,6 +138,7 @@ object HoodieSchemaConversionUtils {
     try {
       HoodieSparkSchemaConverters.toHoodieType(structType, nullable, 
structName, recordNamespace)
     } catch {
+      case h: HoodieSchemaException => throw h
       case a: AvroRuntimeException => throw new 
HoodieSchemaException(a.getMessage, a)
       case e: Exception => throw new HoodieSchemaException(
         s"Failed to convert struct type to HoodieSchema: $structType", e)
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
index c0f386089406..a9ea0aee94b5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.avro
 
 import org.apache.hudi.common.schema.HoodieSchema.TimePrecision
 import org.apache.hudi.common.schema.{HoodieJsonProperties, HoodieSchema, 
HoodieSchemaField, HoodieSchemaType}
+import org.apache.hudi.internal.schema.HoodieSchemaException
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.types.Decimal.minBytesForPrecision
 import org.apache.spark.sql.types._
@@ -44,7 +45,7 @@ object HoodieSparkSchemaConverters {
   /**
    * Internal wrapper for SQL data type and nullability.
    */
-  case class SchemaType(dataType: DataType, nullable: Boolean)
+  case class SchemaType(dataType: DataType, nullable: Boolean, metadata: 
Option[Metadata] = None)
 
   def toSqlType(hoodieSchema: HoodieSchema): (DataType, Boolean) = {
     val result = toSqlTypeHelper(hoodieSchema, Set.empty)
@@ -79,6 +80,29 @@ object HoodieSparkSchemaConverters {
         HoodieSchema.createDecimal(name, nameSpace, null, d.precision, 
d.scale, fixedSize)
 
       // Complex types
+      case ArrayType(elementSparkType, containsNull)
+          if metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
+            
HoodieSchema.parseTypeDescriptor(metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).getType
 == HoodieSchemaType.VECTOR =>
+        if (containsNull) {
+          throw new HoodieSchemaException(
+            s"VECTOR type does not support nullable elements (field: 
$recordName)")
+        }
+
+        val vectorSchema = HoodieSchema
+          
.parseTypeDescriptor(metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+          .asInstanceOf[HoodieSchema.Vector]
+        val dimension = vectorSchema.getDimension
+
+        val elementType = vectorSchema.getVectorElementType
+
+        val expectedSparkType = sparkTypeForVectorElementType(elementType)
+        if (elementSparkType != expectedSparkType) {
+          throw new HoodieSchemaException(
+            s"VECTOR element type mismatch for field $recordName: metadata 
requires $elementType, Spark array has $elementSparkType")
+        }
+
+        HoodieSchema.createVector(dimension, elementType)
+
       case ArrayType(elementType, containsNull) =>
         val elementSchema = toHoodieType(elementType, containsNull, 
recordName, nameSpace, metadata)
         HoodieSchema.createArray(elementSchema)
@@ -88,7 +112,7 @@ object HoodieSparkSchemaConverters {
         HoodieSchema.createMap(valueSchema)
 
       case blobStruct: StructType if 
metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
-        
metadata.getString(HoodieSchema.TYPE_METADATA_FIELD).equalsIgnoreCase(HoodieSchemaType.BLOB.name())
 =>
+        
HoodieSchema.parseTypeDescriptor(metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).getType
 == HoodieSchemaType.BLOB =>
         // Validate blob structure before accepting
         validateBlobStructure(blobStruct)
         HoodieSchema.createBlob()
@@ -184,6 +208,15 @@ object HoodieSparkSchemaConverters {
         SchemaType(StringType, nullable = false)
 
       // Complex types
+      case HoodieSchemaType.VECTOR =>
+        val vectorSchema = hoodieSchema.asInstanceOf[HoodieSchema.Vector]
+        val metadata = new MetadataBuilder()
+          .putString(HoodieSchema.TYPE_METADATA_FIELD, 
vectorSchema.toTypeDescriptor)
+          .build()
+
+        val sparkElementType = 
sparkTypeForVectorElementType(vectorSchema.getVectorElementType)
+        SchemaType(ArrayType(sparkElementType, containsNull = false), nullable 
= false, Some(metadata))
+
       case HoodieSchemaType.BLOB | HoodieSchemaType.RECORD =>
         val fullName = hoodieSchema.getFullName
         if (existingRecordNames.contains(fullName)) {
@@ -196,25 +229,27 @@ object HoodieSparkSchemaConverters {
         val newRecordNames = existingRecordNames + fullName
         val fields = hoodieSchema.getFields.asScala.map { f =>
           val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
-          val commentMetadata = if (f.doc().isPresent && 
!f.doc().get().isEmpty) {
-            new MetadataBuilder().putString("comment", f.doc().get()).build()
-          } else {
-            Metadata.empty
-          }
           val fieldSchema = f.getNonNullSchema
-          val metadata = if (fieldSchema.isBlobField) {
-            // Mark blob fields with metadata for identification.
-            // This assumes blobs are always part of a record and not the top 
level schema itself
-            new MetadataBuilder()
-              .withMetadata(commentMetadata)
-              .putString(HoodieSchema.TYPE_METADATA_FIELD, 
HoodieSchemaType.BLOB.name())
-              .build()
-          } else {
-            commentMetadata
+          val metadataBuilder = new MetadataBuilder()
+            .withMetadata(schemaType.metadata.getOrElse(Metadata.empty))
+          if (f.doc().isPresent && f.doc().get().nonEmpty) {
+            metadataBuilder.putString("comment", f.doc().get())
+          }
+          if (fieldSchema.isBlobField) {
+            metadataBuilder.putString(HoodieSchema.TYPE_METADATA_FIELD, 
HoodieSchema.Blob.TYPE_DESCRIPTOR)
           }
+          val metadata = metadataBuilder.build()
           StructField(f.name(), schemaType.dataType, schemaType.nullable, 
metadata)
         }
-        SchemaType(StructType(fields.toSeq), nullable = false)
+        // For BLOB types, propagate type metadata via SchemaType
+        val schemaTypeMetadata = if (hoodieSchema.getType == 
HoodieSchemaType.BLOB) {
+          Some(new MetadataBuilder()
+            .putString(HoodieSchema.TYPE_METADATA_FIELD, 
hoodieSchema.asInstanceOf[HoodieSchema.Blob].toTypeDescriptor)
+            .build())
+        } else {
+          None
+        }
+        SchemaType(StructType(fields.toSeq), nullable = false, 
schemaTypeMetadata)
 
       case HoodieSchemaType.ARRAY =>
         val elementSchema = hoodieSchema.getElementType
@@ -286,6 +321,13 @@ object HoodieSparkSchemaConverters {
         f.name.matches("member\\d+") && f.nullable
       }
   }
+
+  private def sparkTypeForVectorElementType(
+      elementType: HoodieSchema.Vector.VectorElementType): DataType = 
elementType match {
+    case HoodieSchema.Vector.VectorElementType.FLOAT => FloatType
+    case HoodieSchema.Vector.VectorElementType.DOUBLE => DoubleType
+    case HoodieSchema.Vector.VectorElementType.INT8 => ByteType
+  }
 }
 
 private[avro] class IncompatibleSchemaException(msg: String, ex: Throwable = 
null) extends Exception(msg, ex)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index f35c77c099f5..4cf23ba17dc8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -35,13 +35,16 @@ import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField;
@@ -84,18 +87,100 @@ import static 
org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField;
  */
 public class HoodieSchema implements Serializable {
   private static final long serialVersionUID = 1L;
+
   /**
    * Constant representing a null JSON value, equivalent to 
JsonProperties.NULL_VALUE.
    * This provides compatibility with Avro's JsonProperties while maintaining 
Hudi's API.
    */
   public static final Object NULL_VALUE = JsonProperties.NULL_VALUE;
   public static final HoodieSchema NULL_SCHEMA = 
HoodieSchema.create(HoodieSchemaType.NULL);
-
   /**
    * Constant to use when attaching type metadata to external schema systems 
like Spark's StructType.
+   * Stores a parameterized type string for custom Hudi logical types such as 
VECTOR and BLOB.
+   * Examples: "VECTOR(128)", "VECTOR(512, DOUBLE)", "BLOB".
    */
   public static final String TYPE_METADATA_FIELD = "hudi_type";
 
+  /**
+   * Parses a type descriptor string for custom Hudi logical types such as 
VECTOR and BLOB.
+   * Examples: "VECTOR(128)", "VECTOR(512, DOUBLE)", "BLOB".
+   * Throws for non-custom logical type names.
+   */
+  public static HoodieSchema parseTypeDescriptor(String descriptor) {
+    Pair<HoodieSchemaType, List<String>> parsedDescriptor = 
tokenizeTypeDescriptor(descriptor);
+    HoodieSchemaType type = parsedDescriptor.getLeft();
+    List<String> params = parsedDescriptor.getRight();
+    switch (type) {
+      case VECTOR:
+        if (params.isEmpty()) {
+          throw new IllegalArgumentException("VECTOR type descriptor must 
include a dimension parameter");
+        }
+        if (params.size() > 2) {
+          throw new IllegalArgumentException(
+              "VECTOR type descriptor supports at most 2 parameters: dimension 
and optional element type");
+        }
+        int dimension;
+        try {
+          dimension = Integer.parseInt(params.get(0));
+        } catch (NumberFormatException e) {
+          throw new IllegalArgumentException("Invalid VECTOR dimension: " + 
params.get(0), e);
+        }
+        Vector.VectorElementType elementType = params.size() > 1
+            ? Vector.VectorElementType.fromString(params.get(1))
+            : Vector.VectorElementType.FLOAT;
+        return createVector(dimension, elementType);
+      case BLOB:
+        if (!params.isEmpty()) {
+          throw new IllegalArgumentException(
+              "BLOB type descriptor does not support parameters, got: " + 
params);
+        }
+        return createBlob();
+      default:
+        throw new IllegalArgumentException(
+            "parseTypeDescriptor only supports custom logical types, got: " + 
type);
+    }
+  }
+
+  private static Pair<HoodieSchemaType, List<String>> 
tokenizeTypeDescriptor(String descriptor) {
+    ValidationUtils.checkArgument(descriptor != null && 
!descriptor.trim().isEmpty(),
+        "Type descriptor cannot be null or empty");
+    int parenStart = descriptor.indexOf('(');
+    String typeName;
+    List<String> params;
+    if (parenStart == -1) {
+      typeName = descriptor.trim();
+      params = Collections.emptyList();
+    } else {
+      if (!descriptor.endsWith(")")) {
+        throw new IllegalArgumentException("Malformed type descriptor, missing 
closing ')': " + descriptor);
+      }
+      typeName = descriptor.substring(0, parenStart).trim();
+      String paramStr = descriptor.substring(parenStart + 1, 
descriptor.length() - 1).trim();
+      if (paramStr.isEmpty()) {
+        params = Collections.emptyList();
+      } else {
+        params = Arrays.stream(paramStr.split(","))
+            .map(String::trim)
+            .collect(Collectors.toList());
+      }
+    }
+    HoodieSchemaType type;
+    try {
+      type = HoodieSchemaType.valueOf(typeName.toUpperCase(Locale.ROOT));
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("Unknown Hudi schema type: " + 
typeName, e);
+    }
+    if (!CUSTOM_LOGICAL_TYPES.contains(type)) {
+      throw new IllegalArgumentException(
+          "parseTypeDescriptor only supports custom logical types, got: " + 
type);
+    }
+    return Pair.of(type, params);
+  }
+
+  private static final Set<HoodieSchemaType> CUSTOM_LOGICAL_TYPES =
+      EnumSet.of(HoodieSchemaType.VECTOR, HoodieSchemaType.BLOB);
+
+
   /**
    * Constants for Parquet-style accessor patterns used in nested MAP and 
ARRAY navigation.
    * These patterns are specifically used for column stats generation and 
differ from
@@ -1610,6 +1695,21 @@ public class HoodieSchema implements Serializable {
     }
   }
 
+  /**
+   * Represents a fixed-dimension, typed-element vector stored as an Avro 
FIXED field.
+   *
+   * <p>Each element is serialized contiguously in little-endian byte order
+   * ({@link VectorLogicalType#VECTOR_BYTE_ORDER}).
+   *
+   * <p>Supported element types and their per-element byte sizes:
+   * <ul>
+   *   <li>{@link VectorElementType#FLOAT}  — 4 bytes (IEEE 754 
single-precision)</li>
+   *   <li>{@link VectorElementType#DOUBLE} — 8 bytes (IEEE 754 
double-precision)</li>
+   *   <li>{@link VectorElementType#INT8}   — 1 byte  (signed 8-bit 
integer)</li>
+   * </ul>
+   *
+   * <p>The total FIXED size is {@code dimension * 
elementType.getElementSize()} bytes.
+   */
   public static class Vector extends HoodieSchema {
     private static final String DEFAULT_NAME = "vector";
 
@@ -1715,6 +1815,17 @@ public class HoodieSchema implements Serializable {
       return HoodieSchemaType.VECTOR;
     }
 
+    /**
+     * Returns the type descriptor string for this vector, e.g. "VECTOR(128)" 
or "VECTOR(512, DOUBLE)".
+     * Default element type (FLOAT) is omitted.
+     */
+    public String toTypeDescriptor() {
+      if (getVectorElementType() == VectorElementType.FLOAT) {
+        return "VECTOR(" + getDimension() + ")";
+      }
+      return "VECTOR(" + getDimension() + ", " + getVectorElementType() + ")";
+    }
+
     /**
      * Creates vector schema with specified dimension and element type.
      *
@@ -1987,12 +2098,18 @@ public class HoodieSchema implements Serializable {
     }
   }
 
-  static class VectorLogicalType extends LogicalType {
+  public static class VectorLogicalType extends LogicalType {
     private static final String VECTOR_LOGICAL_TYPE_NAME = "vector";
     private static final String PROP_DIMENSION = "dimension";
     private static final String PROP_ELEMENT_TYPE = "elementType";
     private static final String PROP_STORAGE_BACKING = "storageBacking";
 
+    /**
+     * Byte order used for serializing/deserializing vector elements (FLOAT, 
DOUBLE, etc).
+     * Little-endian is chosen for compatibility with file formats.
+     */
+    public static final ByteOrder VECTOR_BYTE_ORDER = ByteOrder.LITTLE_ENDIAN;
+
     private final int dimension;
     private final String elementType;
     private final String storageBacking;
@@ -2329,6 +2446,7 @@ public class HoodieSchema implements Serializable {
    * Blob types represent raw binary data. The data can be stored in-line as a 
byte array or out-of-line as a reference to a file or offset and length within 
that file.
    */
   public static class Blob extends HoodieSchema {
+    public static final String TYPE_DESCRIPTOR = "BLOB";
     private static final String DEFAULT_NAME = "blob";
     private static final List<Schema.Field> BLOB_FIELDS = createBlobFields();
 
@@ -2374,6 +2492,13 @@ public class HoodieSchema implements Serializable {
       return HoodieSchemaType.BLOB;
     }
 
+    /**
+     * Returns the type descriptor string for Blob: "BLOB".
+     */
+    public String toTypeDescriptor() {
+      return TYPE_DESCRIPTOR;
+    }
+
     private static Schema createSchema(String name) {
       Schema blobSchema = Schema.createRecord(name, null, null, false);
       // each instance requires its own copy of the fields list
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
index 4851d33d1ee6..1032e228034d 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
@@ -991,7 +991,7 @@ public class TestHoodieSchema {
     assertEquals(1536 * 4, vectorFloat.getFixedSize()); // FLOAT is 4 bytes
     assertEquals(768 * 8, vectorDouble.getFixedSize()); // DOUBLE is 8 bytes
   }
-  
+
   @Test
   void testVectorEquality() {
     HoodieSchema.Vector v1 = HoodieSchema.createVector(1536);
@@ -1018,7 +1018,7 @@ public class TestHoodieSchema {
     // Different class
     assertNotEquals(v1, "string");
   }
-  
+
   @Test
   void testVectorSerialization() throws Exception {
     // Create vector with DOUBLE element type
@@ -2231,4 +2231,121 @@ public class TestHoodieSchema {
     
assertFalse(HoodieSchema.createArray(createRecordWithBlob()).isBlobField());
     assertFalse(HoodieSchema.createMap(createRecordWithBlob()).isBlobField());
   }
+
+  @Test
+  public void testToTypeDescriptorVectorDefaultElementType() {
+    HoodieSchema.Vector vector = HoodieSchema.createVector(128, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+    assertEquals("VECTOR(128)", vector.toTypeDescriptor());
+  }
+
+  @Test
+  public void testToTypeDescriptorVectorNonDefaultElementType() {
+    HoodieSchema.Vector vector = HoodieSchema.createVector(512, 
HoodieSchema.Vector.VectorElementType.DOUBLE);
+    assertEquals("VECTOR(512, DOUBLE)", vector.toTypeDescriptor());
+  }
+
+  @Test
+  public void testToTypeDescriptorVectorInt8RoundTrip() {
+    HoodieSchema.Vector vector = HoodieSchema.createVector(32, 
HoodieSchema.Vector.VectorElementType.INT8);
+    assertEquals("VECTOR(32, INT8)", vector.toTypeDescriptor());
+
+    HoodieSchema parsed = HoodieSchema.parseTypeDescriptor("VECTOR(32, INT8)");
+    assertEquals(HoodieSchemaType.VECTOR, parsed.getType());
+    HoodieSchema.Vector parsedVector = (HoodieSchema.Vector) parsed;
+    assertEquals(32, parsedVector.getDimension());
+    assertEquals(HoodieSchema.Vector.VectorElementType.INT8, 
parsedVector.getVectorElementType());
+  }
+
+  @Test
+  public void testParseTypeDescriptorVector() {
+    HoodieSchema parsed = HoodieSchema.parseTypeDescriptor("VECTOR(128)");
+    assertEquals(HoodieSchemaType.VECTOR, parsed.getType());
+    HoodieSchema.Vector vector = (HoodieSchema.Vector) parsed;
+    assertEquals(128, vector.getDimension());
+    assertEquals(HoodieSchema.Vector.VectorElementType.FLOAT, 
vector.getVectorElementType());
+  }
+
+  @Test
+  public void testParseTypeDescriptorVectorWithElementType() {
+    HoodieSchema parsed = HoodieSchema.parseTypeDescriptor("VECTOR(512, 
DOUBLE)");
+    assertEquals(HoodieSchemaType.VECTOR, parsed.getType());
+    HoodieSchema.Vector vector = (HoodieSchema.Vector) parsed;
+    assertEquals(512, vector.getDimension());
+    assertEquals(HoodieSchema.Vector.VectorElementType.DOUBLE, 
vector.getVectorElementType());
+  }
+
+  @Test
+  public void testParseTypeDescriptorThrowsForNonCustomType() {
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("INT"));
+  }
+
+  @Test
+  public void testTypeDescriptorRoundTrip() {
+    HoodieSchema.Vector vector = HoodieSchema.createVector(256, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+    String typeString = vector.toTypeDescriptor();
+    HoodieSchema parsed = HoodieSchema.parseTypeDescriptor(typeString);
+    HoodieSchema.Vector parsedVector = (HoodieSchema.Vector) parsed;
+
+    assertEquals(HoodieSchemaType.VECTOR, parsed.getType());
+    assertEquals(256, parsedVector.getDimension());
+    assertEquals(HoodieSchema.Vector.VectorElementType.FLOAT, 
parsedVector.getVectorElementType());
+
+    // Non-default element type round-trip
+    HoodieSchema.Vector vectorDouble = HoodieSchema.createVector(64, 
HoodieSchema.Vector.VectorElementType.DOUBLE);
+    String typeStringDouble = vectorDouble.toTypeDescriptor();
+    HoodieSchema parsedDouble = 
HoodieSchema.parseTypeDescriptor(typeStringDouble);
+    HoodieSchema.Vector parsedDoubleVector = (HoodieSchema.Vector) 
parsedDouble;
+
+    assertEquals(HoodieSchemaType.VECTOR, parsedDouble.getType());
+    assertEquals(64, parsedDoubleVector.getDimension());
+    assertEquals(HoodieSchema.Vector.VectorElementType.DOUBLE, 
parsedDoubleVector.getVectorElementType());
+  }
+
+  @Test
+  public void testParseTypeDescriptorMalformed() {
+    // Null and empty inputs
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor(null));
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor(""));
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("   "));
+
+    // Missing closing parenthesis
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR(128"));
+
+    // Non-numeric dimension
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR(abc)"));
+
+    // Missing required dimension parameter
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR"));
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR()"));
+
+    // Too many parameters
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR(128, FLOAT, extra)"));
+
+    // Invalid element type
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR(128, INVALID)"));
+
+    // Zero and negative dimensions
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR(0)"));
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR(-1)"));
+
+    // BLOB does not accept parameters
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("BLOB(1024)"));
+  }
+
+  @Test
+  public void testParseTypeDescriptorUnknownType() {
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("UNKNOWN_TYPE"));
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("FOOBAR(123)"));
+  }
+
+  @Test
+  public void testBlobTypeDescriptorRoundTrip() {
+    HoodieSchema.Blob blob = HoodieSchema.createBlob();
+    String typeString = blob.toTypeDescriptor();
+    assertEquals("BLOB", typeString);
+
+    HoodieSchema parsed = HoodieSchema.parseTypeDescriptor(typeString);
+    assertEquals(HoodieSchemaType.BLOB, parsed.getType());
+    assertInstanceOf(HoodieSchema.Blob.class, parsed);
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala
index 3b82ac4a2449..389b8ad03a59 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala
@@ -570,6 +570,290 @@ class TestHoodieSchemaConversionUtils extends FunSuite 
with Matchers {
     internalRowCompare(row1, row2, sparkSchema)
   }
 
+  test("test VECTOR type conversion - Spark to HoodieSchema") {
+    val metadata = new MetadataBuilder()
+      .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(128)")
+      .build()
+    val struct = new StructType()
+      .add("id", IntegerType, false)
+      .add("embedding", ArrayType(FloatType, containsNull = false), nullable = 
false, metadata)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "VectorTest", "test")
+
+    // Verify the embedding field is VECTOR type
+    val embeddingField = hoodieSchema.getField("embedding").get()
+    assert(embeddingField.schema().getType == HoodieSchemaType.VECTOR)
+    assert(embeddingField.schema().isInstanceOf[HoodieSchema.Vector])
+
+    val vectorSchema = 
embeddingField.schema().asInstanceOf[HoodieSchema.Vector]
+    assert(vectorSchema.getDimension == 128)
+    assert(vectorSchema.getVectorElementType == 
HoodieSchema.Vector.VectorElementType.FLOAT)
+    assert(!embeddingField.isNullable())
+  }
+
+  test("test VECTOR type conversion - Spark to HoodieSchema for DOUBLE and 
INT8") {
+    val doubleMetadata = new MetadataBuilder()
+      .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(64, DOUBLE)")
+      .build()
+    val int8Metadata = new MetadataBuilder()
+      .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(32, INT8)")
+      .build()
+
+    val struct = new StructType()
+      .add("embedding_double", ArrayType(DoubleType, containsNull = false), 
nullable = false, doubleMetadata)
+      .add("embedding_int8", ArrayType(ByteType, containsNull = false), 
nullable = false, int8Metadata)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "VectorTypedTest", "test")
+
+    val doubleField = hoodieSchema.getField("embedding_double").get()
+    assert(doubleField.schema().getType == HoodieSchemaType.VECTOR)
+    val doubleVector = doubleField.schema().asInstanceOf[HoodieSchema.Vector]
+    assert(doubleVector.getDimension == 64)
+    assert(doubleVector.getVectorElementType == 
HoodieSchema.Vector.VectorElementType.DOUBLE)
+    assert(!doubleField.isNullable())
+
+    val int8Field = hoodieSchema.getField("embedding_int8").get()
+    assert(int8Field.schema().getType == HoodieSchemaType.VECTOR)
+    val int8Vector = int8Field.schema().asInstanceOf[HoodieSchema.Vector]
+    assert(int8Vector.getDimension == 32)
+    assert(int8Vector.getVectorElementType == 
HoodieSchema.Vector.VectorElementType.INT8)
+    assert(!int8Field.isNullable())
+  }
+
+  test("test VECTOR type conversion - HoodieSchema to Spark") {
+    val vectorSchema = HoodieSchema.createVector(256, 
HoodieSchema.Vector.VectorElementType.FLOAT)
+    val fields = java.util.Arrays.asList(
+      HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
+      HoodieSchemaField.of("embedding", vectorSchema)
+    )
+    val hoodieSchema = HoodieSchema.createRecord("VectorTest", "test", null, 
fields)
+
+    val structType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+    // Verify the embedding field is ArrayType(FloatType)
+    assert(structType.fields.length == 2)
+    val embeddingField = structType.fields(1)
+    assert(embeddingField.name == "embedding")
+    assert(embeddingField.dataType.isInstanceOf[ArrayType])
+    assert(!embeddingField.nullable)
+
+    val arrayType = embeddingField.dataType.asInstanceOf[ArrayType]
+    assert(arrayType.elementType == FloatType)
+    assert(!arrayType.containsNull)
+
+    // Verify metadata contains type descriptor with dimension
+    assert(embeddingField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+    val parsedVector = HoodieSchema.parseTypeDescriptor(
+      
embeddingField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+    assert(parsedVector.getType == HoodieSchemaType.VECTOR)
+    assert(parsedVector.getDimension == 256)
+  }
+
+  test("test VECTOR type conversion - HoodieSchema to Spark for DOUBLE and 
INT8") {
+    val fields = java.util.Arrays.asList(
+      HoodieSchemaField.of("embedding_double", HoodieSchema.createVector(64, 
HoodieSchema.Vector.VectorElementType.DOUBLE)),
+      HoodieSchemaField.of("embedding_int8", HoodieSchema.createVector(32, 
HoodieSchema.Vector.VectorElementType.INT8))
+    )
+    val hoodieSchema = HoodieSchema.createRecord("VectorTypedTest", "test", 
null, fields)
+
+    val structType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+    val doubleField = structType.fields(0)
+    assert(doubleField.dataType == ArrayType(DoubleType, containsNull = false))
+    assert(doubleField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+    val parsedDoubleVector = HoodieSchema.parseTypeDescriptor(
+      
doubleField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+    assert(parsedDoubleVector.getType == HoodieSchemaType.VECTOR)
+    assert(parsedDoubleVector.getDimension == 64)
+    assert(parsedDoubleVector.getVectorElementType == 
HoodieSchema.Vector.VectorElementType.DOUBLE)
+
+    val int8Field = structType.fields(1)
+    assert(int8Field.dataType == ArrayType(ByteType, containsNull = false))
+    assert(int8Field.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+    val parsedInt8Vector = HoodieSchema.parseTypeDescriptor(
+      
int8Field.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+    assert(parsedInt8Vector.getType == HoodieSchemaType.VECTOR)
+    assert(parsedInt8Vector.getDimension == 32)
+    assert(parsedInt8Vector.getVectorElementType == 
HoodieSchema.Vector.VectorElementType.INT8)
+  }
+
+  test("test VECTOR round-trip conversion - Spark to HoodieSchema to Spark") {
+    val metadata = new MetadataBuilder()
+      .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(512)")
+      .build()
+    val originalStruct = new StructType()
+      .add("id", LongType, false)
+      .add("vector_field", ArrayType(FloatType, containsNull = false), 
nullable = false, metadata)
+      .add("name", StringType, true)
+
+    // Convert Spark -> HoodieSchema -> Spark
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      originalStruct, "RoundTripTest", "test")
+    val convertedStruct = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+    // Verify structure is preserved
+    assert(convertedStruct.fields.length == originalStruct.fields.length)
+
+    // Verify vector field properties
+    val originalVectorField = originalStruct.fields(1)
+    val convertedVectorField = convertedStruct.fields(1)
+
+    assert(convertedVectorField.name == originalVectorField.name)
+    assert(convertedVectorField.dataType == originalVectorField.dataType)
+    assert(convertedVectorField.nullable == originalVectorField.nullable)
+
+    // Verify metadata is preserved
+    
assert(convertedVectorField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+    val roundTripVector = HoodieSchema.parseTypeDescriptor(
+      
convertedVectorField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+    assert(roundTripVector.getType == HoodieSchemaType.VECTOR)
+    assert(roundTripVector.getDimension == 512)
+
+    // Verify array properties
+    val convertedArrayType = 
convertedVectorField.dataType.asInstanceOf[ArrayType]
+    assert(convertedArrayType.elementType == FloatType)
+    assert(!convertedArrayType.containsNull)
+  }
+
+  test("test VECTOR round-trip conversion - Spark to HoodieSchema to Spark for 
DOUBLE and INT8") {
+    val doubleMetadata = new MetadataBuilder()
+      .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(64, DOUBLE)")
+      .build()
+    val int8Metadata = new MetadataBuilder()
+      .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(32, INT8)")
+      .build()
+    val originalStruct = new StructType()
+      .add("id", LongType, false)
+      .add("vector_double", ArrayType(DoubleType, containsNull = false), 
nullable = false, doubleMetadata)
+      .add("vector_int8", ArrayType(ByteType, containsNull = false), nullable 
= false, int8Metadata)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      originalStruct, "VectorTypedRoundTripTest", "test")
+    val convertedStruct = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+    val originalDouble = originalStruct.fields(1)
+    val convertedDouble = convertedStruct.fields(1)
+    assert(convertedDouble.name == originalDouble.name)
+    assert(convertedDouble.dataType == originalDouble.dataType)
+    assert(convertedDouble.nullable == originalDouble.nullable)
+    val convertedDoubleVector = HoodieSchema.parseTypeDescriptor(
+      
convertedDouble.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+    assert(convertedDoubleVector.getType == HoodieSchemaType.VECTOR)
+    assert(convertedDoubleVector.getDimension == 64)
+    assert(convertedDoubleVector.getVectorElementType == 
HoodieSchema.Vector.VectorElementType.DOUBLE)
+
+    val originalInt8 = originalStruct.fields(2)
+    val convertedInt8 = convertedStruct.fields(2)
+    assert(convertedInt8.name == originalInt8.name)
+    assert(convertedInt8.dataType == originalInt8.dataType)
+    assert(convertedInt8.nullable == originalInt8.nullable)
+    val convertedInt8Vector = HoodieSchema.parseTypeDescriptor(
+      
convertedInt8.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+    assert(convertedInt8Vector.getType == HoodieSchemaType.VECTOR)
+    assert(convertedInt8Vector.getDimension == 32)
+    assert(convertedInt8Vector.getVectorElementType == 
HoodieSchema.Vector.VectorElementType.INT8)
+  }
+
+  test("test VECTOR round-trip conversion - HoodieSchema to Spark to 
HoodieSchema") {
+    val originalVectorSchema = HoodieSchema.createVector(1024, 
HoodieSchema.Vector.VectorElementType.FLOAT)
+    val fields = java.util.Arrays.asList(
+      HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.LONG)),
+      HoodieSchemaField.of("embedding", originalVectorSchema),
+      HoodieSchemaField.of("metadata", 
HoodieSchema.createNullable(HoodieSchemaType.STRING))
+    )
+    val originalHoodieSchema = HoodieSchema.createRecord("RoundTripTest", 
"test", null, fields)
+
+    // Convert HoodieSchema -> Spark -> HoodieSchema
+    val sparkStruct = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(originalHoodieSchema)
+    val convertedHoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      sparkStruct, "RoundTripTest", "test")
+
+    // Verify the vector field is preserved
+    val convertedEmbeddingField = 
convertedHoodieSchema.getField("embedding").get()
+    assert(convertedEmbeddingField.schema().getType == HoodieSchemaType.VECTOR)
+    assert(convertedEmbeddingField.schema().isInstanceOf[HoodieSchema.Vector])
+
+    val convertedVectorSchema = 
convertedEmbeddingField.schema().asInstanceOf[HoodieSchema.Vector]
+    assert(convertedVectorSchema.getDimension == 1024)
+    assert(convertedVectorSchema.getVectorElementType == 
HoodieSchema.Vector.VectorElementType.FLOAT)
+    assert(!convertedEmbeddingField.isNullable())
+
+    // Verify other fields are preserved
+    assert(convertedHoodieSchema.getFields.size() == 3)
+    assert(convertedHoodieSchema.getField("id").get().schema().getType == 
HoodieSchemaType.LONG)
+    assert(convertedHoodieSchema.getField("metadata").get().isNullable())
+  }
+
+  test("test nullable VECTOR round-trip conversion") {
+    val metadata = new MetadataBuilder()
+      .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(64)")
+      .build()
+    val originalStruct = new StructType()
+      .add("id", LongType, false)
+      .add("embedding", ArrayType(FloatType, containsNull = false), nullable = 
true, metadata)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      originalStruct, "NullableVectorTest", "test")
+
+    // Verify the vector field is nullable in HoodieSchema
+    val embeddingField = hoodieSchema.getField("embedding").get()
+    assert(embeddingField.isNullable())
+    val vectorSchema = 
embeddingField.schema().getNonNullType().asInstanceOf[HoodieSchema.Vector]
+    assert(vectorSchema.getDimension == 64)
+    assert(vectorSchema.getVectorElementType == 
HoodieSchema.Vector.VectorElementType.FLOAT)
+
+    // Convert back to Spark
+    val convertedStruct = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+    val convertedField = convertedStruct.fields(1)
+    assert(convertedField.nullable)
+    assert(convertedField.dataType == ArrayType(FloatType, containsNull = 
false))
+    assert(convertedField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+    val parsedVector = HoodieSchema.parseTypeDescriptor(
+      
convertedField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+    assert(parsedVector.getDimension == 64)
+  }
+
+  test("test nullable VECTOR via UNION preserves metadata in HoodieSchema to 
Spark conversion") {
+    val vectorSchema = HoodieSchema.createVector(256, 
HoodieSchema.Vector.VectorElementType.FLOAT)
+    val nullableVectorSchema = HoodieSchema.createNullable(vectorSchema)
+    val fields = java.util.Arrays.asList(
+      HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
+      HoodieSchemaField.of("embedding", nullableVectorSchema)
+    )
+    val hoodieSchema = HoodieSchema.createRecord("NullableVectorUnionTest", 
"test", null, fields)
+
+    val structType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+    val embeddingField = structType.fields(1)
+    assert(embeddingField.name == "embedding")
+    assert(embeddingField.nullable)
+    assert(embeddingField.dataType == ArrayType(FloatType, containsNull = 
false))
+
+    // The key assertion: metadata must survive the UNION unwrapping 
(.copy(nullable = true))
+    assert(embeddingField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+    val parsedVector = HoodieSchema.parseTypeDescriptor(
+      
embeddingField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+    assert(parsedVector.getDimension == 256)
+    assert(parsedVector.getVectorElementType == 
HoodieSchema.Vector.VectorElementType.FLOAT)
+  }
+
+  test("test VECTOR element type mismatch throws error") {
+    // Metadata says DOUBLE, but Spark array element type is Float
+    val mismatchMetadata = new MetadataBuilder()
+      .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(128, DOUBLE)")
+      .build()
+    val struct = new StructType()
+      .add("embedding", ArrayType(FloatType, containsNull = false), nullable = 
false, mismatchMetadata)
+
+    the[Exception] thrownBy {
+      HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+        struct, "MismatchTest", "test")
+    } should have message (
+      "VECTOR element type mismatch for field embedding: metadata requires 
DOUBLE, Spark array has FloatType")
+  }
+
   private def internalRowCompare(expected: Any, actual: Any, schema: 
DataType): Unit = {
     schema match {
       case StructType(fields) =>
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala
index 31367060e5dc..bea088348f6b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala
@@ -19,12 +19,18 @@ package org.apache.spark.sql.avro
 
 import org.apache.hudi.SparkAdapterSupport
 import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, IntWrapper}
-import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaField, 
HoodieSchemaType}
 
 import org.apache.avro.generic.GenericData
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals}
 import org.junit.jupiter.api.Test
 
+import java.nio.{ByteBuffer, ByteOrder}
+import java.util
+
 class TestAvroSerDe extends SparkAdapterSupport {
 
   @Test
@@ -59,4 +65,130 @@ class TestAvroSerDe extends SparkAdapterSupport {
 
     assertEquals(originalAvroRecord, deserializedAvroRecord)
   }
+
+  @Test
+  def testVectorFloatSerDe(): Unit = {
+    val dimension = 4
+    val vectorSchema = HoodieSchema.createVector(dimension, 
HoodieSchema.Vector.VectorElementType.FLOAT)
+    val fields = java.util.Arrays.asList(
+      HoodieSchemaField.of("embedding", vectorSchema)
+    )
+    val hoodieSchema = HoodieSchema.createRecord("FloatVectorRecord", "test", 
null, fields)
+    val avroSchema = hoodieSchema.toAvroSchema
+
+    // Build a GenericData.Record with float vector data
+    val buffer = ByteBuffer.allocate(dimension * 
4).order(ByteOrder.LITTLE_ENDIAN)
+    buffer.putFloat(1.0f).putFloat(2.5f).putFloat(-3.0f).putFloat(0.0f)
+    val fixedField = new 
GenericData.Fixed(avroSchema.getField("embedding").schema(), buffer.array())
+
+    val originalRecord = new GenericData.Record(avroSchema)
+    originalRecord.put("embedding", fixedField)
+
+    val (catalystSchema, _) = 
HoodieSparkSchemaConverters.toSqlType(hoodieSchema)
+    val deserializer = sparkAdapter.createAvroDeserializer(hoodieSchema, 
catalystSchema)
+    val serializer = sparkAdapter.createAvroSerializer(catalystSchema, 
hoodieSchema, nullable = false)
+
+    val row = deserializer.deserialize(originalRecord).get
+    val deserializedRecord = serializer.serialize(row)
+
+    assertEquals(originalRecord, deserializedRecord)
+  }
+
+  @Test
+  def testVectorDoubleSerDe(): Unit = {
+    val dimension = 3
+    val vectorSchema = HoodieSchema.createVector(dimension, 
HoodieSchema.Vector.VectorElementType.DOUBLE)
+    val fields = java.util.Arrays.asList(
+      HoodieSchemaField.of("embedding", vectorSchema)
+    )
+    val hoodieSchema = HoodieSchema.createRecord("DoubleVectorRecord", "test", 
null, fields)
+    val avroSchema = hoodieSchema.toAvroSchema
+
+    val buffer = ByteBuffer.allocate(dimension * 
8).order(ByteOrder.LITTLE_ENDIAN)
+    buffer.putDouble(1.0).putDouble(-2.5).putDouble(3.14159)
+    val fixedField = new 
GenericData.Fixed(avroSchema.getField("embedding").schema(), buffer.array())
+
+    val originalRecord = new GenericData.Record(avroSchema)
+    originalRecord.put("embedding", fixedField)
+
+    val (catalystSchema, _) = 
HoodieSparkSchemaConverters.toSqlType(hoodieSchema)
+    val deserializer = sparkAdapter.createAvroDeserializer(hoodieSchema, 
catalystSchema)
+    val serializer = sparkAdapter.createAvroSerializer(catalystSchema, 
hoodieSchema, nullable = false)
+
+    val row = deserializer.deserialize(originalRecord).get
+    val deserializedRecord = serializer.serialize(row)
+
+    assertEquals(originalRecord, deserializedRecord)
+  }
+
+  @Test
+  def testVectorFloatByteOrder(): Unit = {
+    val dimension = 2
+    val vectorSchema = HoodieSchema.createVector(dimension, 
HoodieSchema.Vector.VectorElementType.FLOAT)
+    val fields = util.Arrays.asList(
+      HoodieSchemaField.of("embedding", vectorSchema)
+    )
+    val hoodieSchema = HoodieSchema.createRecord("FloatByteOrderRecord", 
"test", null, fields)
+    val avroSchema = hoodieSchema.toAvroSchema
+
+    // Build input: Spark array of floats → serialize → Avro Fixed
+    val inputFloats = Array(1.0f, -2.0f)
+    val sparkRow = {
+      val row = new GenericInternalRow(1)
+      row.update(0, ArrayData.toArrayData(inputFloats))
+      row
+    }
+
+    val (catalystSchema, _) = 
HoodieSparkSchemaConverters.toSqlType(hoodieSchema)
+    val serializer = sparkAdapter.createAvroSerializer(catalystSchema, 
hoodieSchema, nullable = false)
+    val serialized = 
serializer.serialize(sparkRow).asInstanceOf[GenericData.Record]
+    val fixedBytes = 
serialized.get("embedding").asInstanceOf[GenericData.Fixed].bytes()
+
+    // Assert little-endian IEEE 754 layout byte-by-byte:
+    //   1.0f  LE = 0x00 0x00 0x80 0x3F
+    //  -2.0f  LE = 0x00 0x00 0x00 0xC0
+    val expectedBytes = Array[Byte](
+      0x00, 0x00, 0x80.toByte, 0x3F,
+      0x00, 0x00, 0x00, 0xC0.toByte
+    )
+    assertArrayEquals(expectedBytes, fixedBytes,
+      "Serialized bytes must match little-endian IEEE 754 float layout")
+
+    // Deserialize the raw bytes back and verify float values
+    val fixedField = new 
GenericData.Fixed(avroSchema.getField("embedding").schema(), fixedBytes)
+    val record = new GenericData.Record(avroSchema)
+    record.put("embedding", fixedField)
+
+    val deserializer = sparkAdapter.createAvroDeserializer(hoodieSchema, 
catalystSchema)
+    val row = deserializer.deserialize(record).get.asInstanceOf[InternalRow]
+    val resultArray = row.getArray(0)
+    assertEquals(1.0f, resultArray.getFloat(0), 0.0f)
+    assertEquals(-2.0f, resultArray.getFloat(1), 0.0f)
+  }
+
+  @Test
+  def testVectorInt8SerDe(): Unit = {
+    val dimension = 5
+    val vectorSchema = HoodieSchema.createVector(dimension, 
HoodieSchema.Vector.VectorElementType.INT8)
+    val fields = java.util.Arrays.asList(
+      HoodieSchemaField.of("embedding", vectorSchema)
+    )
+    val hoodieSchema = HoodieSchema.createRecord("Int8VectorRecord", "test", 
null, fields)
+    val avroSchema = hoodieSchema.toAvroSchema
+
+    val bytes = Array[Byte](1, -2, 127, -128, 0)
+    val fixedField = new 
GenericData.Fixed(avroSchema.getField("embedding").schema(), bytes)
+
+    val originalRecord = new GenericData.Record(avroSchema)
+    originalRecord.put("embedding", fixedField)
+
+    val (catalystSchema, _) = 
HoodieSparkSchemaConverters.toSqlType(hoodieSchema)
+    val deserializer = sparkAdapter.createAvroDeserializer(hoodieSchema, 
catalystSchema)
+    val serializer = sparkAdapter.createAvroSerializer(catalystSchema, 
hoodieSchema, nullable = false)
+
+    val row = deserializer.deserialize(originalRecord).get
+    val deserializedRecord = serializer.serialize(row)
+
+    assertEquals(originalRecord, deserializedRecord)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
index 038dcafab18b..7b2dfdbfbab1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
@@ -19,9 +19,10 @@ package org.apache.spark.sql.avro
 
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats
 import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaField, 
HoodieSchemaType}
+import org.apache.hudi.internal.schema.HoodieSchemaException
 
 import org.apache.avro.JsonProperties
-import org.apache.spark.sql.types.{DataTypes, MetadataBuilder, StructField, 
StructType}
+import org.apache.spark.sql.types.{ArrayType, DataTypes, FloatType, 
MetadataBuilder, StructField, StructType}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertThrows, assertTrue}
 import org.junit.jupiter.api.Test
 
@@ -181,6 +182,17 @@ class TestSchemaConverters {
     assertEquals(HoodieSchemaType.BLOB, 
nestedMapType.getField("nested_blob").get.schema.getType)
   }
 
+  @Test
+  def testVectorContainsNullThrows(): Unit = {
+    val metadata = new MetadataBuilder()
+      .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(4)")
+      .build()
+    val sparkType = ArrayType(FloatType, containsNull = true)
+    assertThrows(classOf[HoodieSchemaException], () => {
+      HoodieSparkSchemaConverters.toHoodieType(sparkType, nullable = false, 
metadata = metadata)
+    })
+  }
+
   /**
    * Validates the content of the blob fields to ensure the fields match our 
expectations.
    * @param dataType the StructType containing the blob fields to validate
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 6466d02e3d0f..b5eba6be24cd 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.avro
 
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
 import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
 import org.apache.avro.Conversions.DecimalConversion
 import org.apache.avro.LogicalTypes.{LocalTimestampMicros, 
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -36,6 +39,7 @@ import org.apache.spark.unsafe.types.UTF8String
 
 import java.math.BigDecimal
 import java.nio.ByteBuffer
+import java.nio.ByteOrder
 import java.util.TimeZone
 
 import scala.collection.JavaConverters._
@@ -164,6 +168,37 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
           s"Avro logical type $other cannot be converted to SQL type 
${TimestampNTZType.sql}.")
       }
 
+      // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+      case (FIXED, ArrayType(elementType, false)) => avroType.getLogicalType 
match {
+        case vectorLogicalType: VectorLogicalType =>
+          val dimension = vectorLogicalType.getDimension
+          val vecElementType = 
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+          val elementSize = vecElementType.getElementSize
+          (updater, ordinal, value) => {
+            val bytes = value.asInstanceOf[GenericData.Fixed].bytes()
+            val expectedSize = Math.multiplyExact(dimension, elementSize)
+            if (bytes.length != expectedSize) {
+              throw new IncompatibleSchemaException(
+                s"VECTOR byte size mismatch: expected=$expectedSize, 
actual=${bytes.length}")
+            }
+            elementType match {
+              case FloatType =>
+                val buffer = 
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                val floats = new Array[Float](dimension)
+                var i = 0; while (i < dimension) { floats(i) = 
buffer.getFloat(); i += 1 }
+                updater.set(ordinal, ArrayData.toArrayData(floats))
+              case DoubleType =>
+                val buffer = 
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                val doubles = new Array[Double](dimension)
+                var i = 0; while (i < dimension) { doubles(i) = 
buffer.getDouble(); i += 1 }
+                updater.set(ordinal, ArrayData.toArrayData(doubles))
+              case ByteType =>
+                updater.set(ordinal, ArrayData.toArrayData(bytes.clone()))
+            }
+          }
+        case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+      }
+
       // Before we upgrade Avro to 1.8 for logical type support, spark-avro 
converts Long to Date.
       // For backward compatibility, we still keep this conversion.
       case (LONG, DateType) => (updater, ordinal, value) =>
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 874b90f88ae8..a1241b72e58b 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.avro
 
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
 import org.apache.avro.{LogicalTypes, Schema}
 import org.apache.avro.Conversions.DecimalConversion
 import org.apache.avro.LogicalTypes.{LocalTimestampMicros, 
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -36,6 +39,7 @@ import 
org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 
 import java.nio.ByteBuffer
+import java.nio.ByteOrder
 import java.util.TimeZone
 
 import scala.collection.JavaConverters._
@@ -145,6 +149,40 @@ private[sql] class AvroSerializer(rootCatalystType: 
DataType,
           decimalConversions.toBytes(decimal.toJavaBigDecimal, avroType,
             LogicalTypes.decimal(d.precision, d.scale))
 
+      // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+      case (ArrayType(elementType, false), FIXED) => avroType.getLogicalType 
match {
+        case vectorLogicalType: VectorLogicalType =>
+          val dimension = vectorLogicalType.getDimension
+          val vecElementType = 
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+          val bufferSize = Math.multiplyExact(dimension, 
vecElementType.getElementSize)
+          (getter, ordinal) => {
+            val arrayData = getter.getArray(ordinal)
+            if (arrayData.numElements() != dimension) {
+              throw new IncompatibleSchemaException(
+                s"VECTOR dimension mismatch at ${toFieldStr(catalystPath)}: " +
+                s"expected=$dimension, actual=${arrayData.numElements()}")
+            }
+            elementType match {
+              case FloatType =>
+                val buffer = 
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                var i = 0; while (i < dimension) { 
buffer.putFloat(arrayData.getFloat(i)); i += 1 }
+                new Fixed(avroType, buffer.array())
+              case DoubleType =>
+                val buffer = 
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                var i = 0; while (i < dimension) { 
buffer.putDouble(arrayData.getDouble(i)); i += 1 }
+                new Fixed(avroType, buffer.array())
+              case ByteType =>
+                val bytes = new Array[Byte](dimension)
+                var i = 0; while (i < dimension) { bytes(i) = 
arrayData.getByte(i); i += 1 }
+                new Fixed(avroType, bytes)
+              case _ => throw new IncompatibleSchemaException(errorPrefix +
+                s"schema is incompatible (sqlType = ${catalystType.sql}, 
avroType = $avroType)")
+            }
+          }
+        case _ => throw new IncompatibleSchemaException(errorPrefix +
+          s"schema is incompatible (sqlType = ${catalystType.sql}, avroType = 
$avroType)")
+      }
+
       case (StringType, ENUM) =>
         val enumSymbols: Set[String] = avroType.getEnumSymbols.asScala.toSet
         (getter, ordinal) =>
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 6466d02e3d0f..b5eba6be24cd 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.avro
 
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
 import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
 import org.apache.avro.Conversions.DecimalConversion
 import org.apache.avro.LogicalTypes.{LocalTimestampMicros, 
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -36,6 +39,7 @@ import org.apache.spark.unsafe.types.UTF8String
 
 import java.math.BigDecimal
 import java.nio.ByteBuffer
+import java.nio.ByteOrder
 import java.util.TimeZone
 
 import scala.collection.JavaConverters._
@@ -164,6 +168,37 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
           s"Avro logical type $other cannot be converted to SQL type 
${TimestampNTZType.sql}.")
       }
 
+      // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+      case (FIXED, ArrayType(elementType, false)) => avroType.getLogicalType 
match {
+        case vectorLogicalType: VectorLogicalType =>
+          val dimension = vectorLogicalType.getDimension
+          val vecElementType = 
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+          val elementSize = vecElementType.getElementSize
+          (updater, ordinal, value) => {
+            val bytes = value.asInstanceOf[GenericData.Fixed].bytes()
+            val expectedSize = Math.multiplyExact(dimension, elementSize)
+            if (bytes.length != expectedSize) {
+              throw new IncompatibleSchemaException(
+                s"VECTOR byte size mismatch: expected=$expectedSize, 
actual=${bytes.length}")
+            }
+            elementType match {
+              case FloatType =>
+                val buffer = 
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                val floats = new Array[Float](dimension)
+                var i = 0; while (i < dimension) { floats(i) = 
buffer.getFloat(); i += 1 }
+                updater.set(ordinal, ArrayData.toArrayData(floats))
+              case DoubleType =>
+                val buffer = 
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                val doubles = new Array[Double](dimension)
+                var i = 0; while (i < dimension) { doubles(i) = 
buffer.getDouble(); i += 1 }
+                updater.set(ordinal, ArrayData.toArrayData(doubles))
+              case ByteType =>
+                updater.set(ordinal, ArrayData.toArrayData(bytes.clone()))
+            }
+          }
+        case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+      }
+
       // Before we upgrade Avro to 1.8 for logical type support, spark-avro 
converts Long to Date.
       // For backward compatibility, we still keep this conversion.
       case (LONG, DateType) => (updater, ordinal, value) =>
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 874b90f88ae8..a1241b72e58b 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.avro
 
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
 import org.apache.avro.{LogicalTypes, Schema}
 import org.apache.avro.Conversions.DecimalConversion
 import org.apache.avro.LogicalTypes.{LocalTimestampMicros, 
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -36,6 +39,7 @@ import 
org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 
 import java.nio.ByteBuffer
+import java.nio.ByteOrder
 import java.util.TimeZone
 
 import scala.collection.JavaConverters._
@@ -145,6 +149,40 @@ private[sql] class AvroSerializer(rootCatalystType: 
DataType,
           decimalConversions.toBytes(decimal.toJavaBigDecimal, avroType,
             LogicalTypes.decimal(d.precision, d.scale))
 
+      // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+      case (ArrayType(elementType, false), FIXED) => avroType.getLogicalType 
match {
+        case vectorLogicalType: VectorLogicalType =>
+          val dimension = vectorLogicalType.getDimension
+          val vecElementType = 
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+          val bufferSize = Math.multiplyExact(dimension, 
vecElementType.getElementSize)
+          (getter, ordinal) => {
+            val arrayData = getter.getArray(ordinal)
+            if (arrayData.numElements() != dimension) {
+              throw new IncompatibleSchemaException(
+                s"VECTOR dimension mismatch at ${toFieldStr(catalystPath)}: " +
+                s"expected=$dimension, actual=${arrayData.numElements()}")
+            }
+            elementType match {
+              case FloatType =>
+                val buffer = 
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                var i = 0; while (i < dimension) { 
buffer.putFloat(arrayData.getFloat(i)); i += 1 }
+                new Fixed(avroType, buffer.array())
+              case DoubleType =>
+                val buffer = 
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                var i = 0; while (i < dimension) { 
buffer.putDouble(arrayData.getDouble(i)); i += 1 }
+                new Fixed(avroType, buffer.array())
+              case ByteType =>
+                val bytes = new Array[Byte](dimension)
+                var i = 0; while (i < dimension) { bytes(i) = 
arrayData.getByte(i); i += 1 }
+                new Fixed(avroType, bytes)
+              case _ => throw new IncompatibleSchemaException(errorPrefix +
+                s"schema is incompatible (sqlType = ${catalystType.sql}, 
avroType = $avroType)")
+            }
+          }
+        case _ => throw new IncompatibleSchemaException(errorPrefix +
+          s"schema is incompatible (sqlType = ${catalystType.sql}, avroType = 
$avroType)")
+      }
+
       case (StringType, ENUM) =>
         val enumSymbols: Set[String] = avroType.getEnumSymbols.asScala.toSet
         (getter, ordinal) =>
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 8623d24c6f58..d10ff1edfe87 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.avro
 
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
 import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
 import org.apache.avro.Conversions.DecimalConversion
 import org.apache.avro.LogicalTypes.{LocalTimestampMicros, 
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -36,6 +39,7 @@ import org.apache.spark.unsafe.types.UTF8String
 
 import java.math.BigDecimal
 import java.nio.ByteBuffer
+import java.nio.ByteOrder
 import java.util.TimeZone
 
 import scala.collection.JavaConverters._
@@ -164,6 +168,37 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
           s"Avro logical type $other cannot be converted to SQL type 
${TimestampNTZType.sql}.")
       }
 
+      // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+      case (FIXED, ArrayType(elementType, false)) => avroType.getLogicalType 
match {
+        case vectorLogicalType: VectorLogicalType =>
+          val dimension = vectorLogicalType.getDimension
+          val vecElementType = 
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+          val elementSize = vecElementType.getElementSize
+          (updater, ordinal, value) => {
+            val bytes = value.asInstanceOf[GenericData.Fixed].bytes()
+            val expectedSize = Math.multiplyExact(dimension, elementSize)
+            if (bytes.length != expectedSize) {
+              throw new IncompatibleSchemaException(
+                s"VECTOR byte size mismatch: expected=$expectedSize, 
actual=${bytes.length}")
+            }
+            elementType match {
+              case FloatType =>
+                val buffer = 
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                val floats = new Array[Float](dimension)
+                var i = 0; while (i < dimension) { floats(i) = 
buffer.getFloat(); i += 1 }
+                updater.set(ordinal, ArrayData.toArrayData(floats))
+              case DoubleType =>
+                val buffer = 
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                val doubles = new Array[Double](dimension)
+                var i = 0; while (i < dimension) { doubles(i) = 
buffer.getDouble(); i += 1 }
+                updater.set(ordinal, ArrayData.toArrayData(doubles))
+              case ByteType =>
+                updater.set(ordinal, ArrayData.toArrayData(bytes.clone()))
+            }
+          }
+        case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+      }
+
       // Before we upgrade Avro to 1.8 for logical type support, spark-avro 
converts Long to Date.
       // For backward compatibility, we still keep this conversion.
       case (LONG, DateType) => (updater, ordinal, value) =>
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 968cc8ff02c6..756ef82a55d2 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.avro
 
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
 import org.apache.avro.{LogicalTypes, Schema}
 import org.apache.avro.Conversions.DecimalConversion
 import org.apache.avro.LogicalTypes.{LocalTimestampMicros, 
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -35,6 +38,7 @@ import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, 
SQLConf}
 import org.apache.spark.sql.types._
 
 import java.nio.ByteBuffer
+import java.nio.ByteOrder
 import java.util.TimeZone
 
 import scala.collection.JavaConverters._
@@ -144,6 +148,40 @@ private[sql] class AvroSerializer(rootCatalystType: 
DataType,
           decimalConversions.toBytes(decimal.toJavaBigDecimal, avroType,
             LogicalTypes.decimal(d.precision, d.scale))
 
+      // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+      case (ArrayType(elementType, false), FIXED) => avroType.getLogicalType 
match {
+        case vectorLogicalType: VectorLogicalType =>
+          val dimension = vectorLogicalType.getDimension
+          val vecElementType = 
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+          val bufferSize = Math.multiplyExact(dimension, 
vecElementType.getElementSize)
+          (getter, ordinal) => {
+            val arrayData = getter.getArray(ordinal)
+            if (arrayData.numElements() != dimension) {
+              throw new IncompatibleSchemaException(
+                s"VECTOR dimension mismatch at ${toFieldStr(catalystPath)}: " +
+                s"expected=$dimension, actual=${arrayData.numElements()}")
+            }
+            elementType match {
+              case FloatType =>
+                val buffer = 
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                var i = 0; while (i < dimension) { 
buffer.putFloat(arrayData.getFloat(i)); i += 1 }
+                new Fixed(avroType, buffer.array())
+              case DoubleType =>
+                val buffer = 
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                var i = 0; while (i < dimension) { 
buffer.putDouble(arrayData.getDouble(i)); i += 1 }
+                new Fixed(avroType, buffer.array())
+              case ByteType =>
+                val bytes = new Array[Byte](dimension)
+                var i = 0; while (i < dimension) { bytes(i) = 
arrayData.getByte(i); i += 1 }
+                new Fixed(avroType, bytes)
+              case _ => throw new IncompatibleSchemaException(errorPrefix +
+                s"schema is incompatible (sqlType = ${catalystType.sql}, 
avroType = $avroType)")
+            }
+          }
+        case _ => throw new IncompatibleSchemaException(errorPrefix +
+          s"schema is incompatible (sqlType = ${catalystType.sql}, avroType = 
$avroType)")
+      }
+
       case (StringType, ENUM) =>
         val enumSymbols: Set[String] = avroType.getEnumSymbols.asScala.toSet
         (getter, ordinal) =>
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 8623d24c6f58..926e5b9f7dbd 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.avro
 
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
 import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
 import org.apache.avro.Conversions.DecimalConversion
 import org.apache.avro.LogicalTypes.{LocalTimestampMicros, 
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -36,6 +39,7 @@ import org.apache.spark.unsafe.types.UTF8String
 
 import java.math.BigDecimal
 import java.nio.ByteBuffer
+import java.nio.ByteOrder
 import java.util.TimeZone
 
 import scala.collection.JavaConverters._
@@ -212,6 +216,37 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
         val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
         updater.setDecimal(ordinal, decimal)
 
+      // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+      case (FIXED, ArrayType(elementType, false)) => avroType.getLogicalType 
match {
+        case vectorLogicalType: VectorLogicalType =>
+          val dimension = vectorLogicalType.getDimension
+          val vecElementType = 
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+          val elementSize = vecElementType.getElementSize
+          (updater, ordinal, value) => {
+            val bytes = value.asInstanceOf[GenericData.Fixed].bytes()
+            val expectedSize = Math.multiplyExact(dimension, elementSize)
+            if (bytes.length != expectedSize) {
+              throw new IncompatibleSchemaException(
+                s"VECTOR byte size mismatch: expected=$expectedSize, 
actual=${bytes.length}")
+            }
+            elementType match {
+              case FloatType =>
+                val buffer = 
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                val floats = new Array[Float](dimension)
+                var i = 0; while (i < dimension) { floats(i) = 
buffer.getFloat(); i += 1 }
+                updater.set(ordinal, ArrayData.toArrayData(floats))
+              case DoubleType =>
+                val buffer = 
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                val doubles = new Array[Double](dimension)
+                var i = 0; while (i < dimension) { doubles(i) = 
buffer.getDouble(); i += 1 }
+                updater.set(ordinal, ArrayData.toArrayData(doubles))
+              case ByteType =>
+                updater.set(ordinal, ArrayData.toArrayData(bytes.clone()))
+            }
+          }
+        case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+      }
+
       case (BYTES, _: DecimalType) => (updater, ordinal, value) =>
         val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
         val bigDecimal = 
decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 968cc8ff02c6..756ef82a55d2 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.avro
 
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
 import org.apache.avro.{LogicalTypes, Schema}
 import org.apache.avro.Conversions.DecimalConversion
 import org.apache.avro.LogicalTypes.{LocalTimestampMicros, 
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -35,6 +38,7 @@ import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, 
SQLConf}
 import org.apache.spark.sql.types._
 
 import java.nio.ByteBuffer
+import java.nio.ByteOrder
 import java.util.TimeZone
 
 import scala.collection.JavaConverters._
@@ -144,6 +148,40 @@ private[sql] class AvroSerializer(rootCatalystType: 
DataType,
           decimalConversions.toBytes(decimal.toJavaBigDecimal, avroType,
             LogicalTypes.decimal(d.precision, d.scale))
 
+      // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+      case (ArrayType(elementType, false), FIXED) => avroType.getLogicalType 
match {
+        case vectorLogicalType: VectorLogicalType =>
+          val dimension = vectorLogicalType.getDimension
+          val vecElementType = 
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+          val bufferSize = Math.multiplyExact(dimension, 
vecElementType.getElementSize)
+          (getter, ordinal) => {
+            val arrayData = getter.getArray(ordinal)
+            if (arrayData.numElements() != dimension) {
+              throw new IncompatibleSchemaException(
+                s"VECTOR dimension mismatch at ${toFieldStr(catalystPath)}: " +
+                s"expected=$dimension, actual=${arrayData.numElements()}")
+            }
+            elementType match {
+              case FloatType =>
+                val buffer = 
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                var i = 0; while (i < dimension) { 
buffer.putFloat(arrayData.getFloat(i)); i += 1 }
+                new Fixed(avroType, buffer.array())
+              case DoubleType =>
+                val buffer = 
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+                var i = 0; while (i < dimension) { 
buffer.putDouble(arrayData.getDouble(i)); i += 1 }
+                new Fixed(avroType, buffer.array())
+              case ByteType =>
+                val bytes = new Array[Byte](dimension)
+                var i = 0; while (i < dimension) { bytes(i) = 
arrayData.getByte(i); i += 1 }
+                new Fixed(avroType, bytes)
+              case _ => throw new IncompatibleSchemaException(errorPrefix +
+                s"schema is incompatible (sqlType = ${catalystType.sql}, 
avroType = $avroType)")
+            }
+          }
+        case _ => throw new IncompatibleSchemaException(errorPrefix +
+          s"schema is incompatible (sqlType = ${catalystType.sql}, avroType = 
$avroType)")
+      }
+
       case (StringType, ENUM) =>
         val enumSymbols: Set[String] = avroType.getEnumSymbols.asScala.toSet
         (getter, ordinal) =>

Reply via email to