This is an automated email from the ASF dual-hosted git repository.
vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2c1cb392df14 feat(vector): add converters from spark to hoodieSchema
for vectors (#18190)
2c1cb392df14 is described below
commit 2c1cb392df14228037cc3318c151ee50ebd8912f
Author: Rahil C <[email protected]>
AuthorDate: Fri Mar 6 21:20:56 2026 -0800
feat(vector): add converters from spark to hoodieSchema for vectors (#18190)
* add VECTOR type to HoodieSchema
* keep fixed bytes only
* address elementType
* fixes
* add all tests
* move metadata to schema level instead of as fields
* fix ci
* use enum for vector elemnt type
* remove nesting for fixed bytes
* checkstyle fixes
* remove fixed requirement
* address comments
* fix nested strutucres test
* check style
* feat(vector): add converters from spark to hoodieSchema for vectors
* minor fix
* use generic metadata field for dimnesion property
* address feedback
* address ethan feedback
* add type descriptor and parser for hudi_type metadata
* address vinoth comments
* self review, additional test coverage
* address intial comments
* do self review and address voon comments
* minor fix
* fix build after rebase
* fix ci for spark 3.3 and 3.4, needed to implement the same code in the
avro serializer and deserializers for those spark versions
* minor space fix
* add spark 4.0 to fix failing test
* address voon indepth comments
* address voon final comments
* address comments again to make type descriptor api more clear
---
.../apache/hudi/HoodieSchemaConversionUtils.scala | 1 +
.../sql/avro/HoodieSparkSchemaConverters.scala | 76 ++++--
.../apache/hudi/common/schema/HoodieSchema.java | 129 +++++++++-
.../hudi/common/schema/TestHoodieSchema.java | 121 ++++++++-
.../hudi/TestHoodieSchemaConversionUtils.scala | 284 +++++++++++++++++++++
.../org/apache/spark/sql/avro/TestAvroSerDe.scala | 136 +++++++++-
.../spark/sql/avro/TestSchemaConverters.scala | 14 +-
.../apache/spark/sql/avro/AvroDeserializer.scala | 35 +++
.../org/apache/spark/sql/avro/AvroSerializer.scala | 38 +++
.../apache/spark/sql/avro/AvroDeserializer.scala | 35 +++
.../org/apache/spark/sql/avro/AvroSerializer.scala | 38 +++
.../apache/spark/sql/avro/AvroDeserializer.scala | 35 +++
.../org/apache/spark/sql/avro/AvroSerializer.scala | 38 +++
.../apache/spark/sql/avro/AvroDeserializer.scala | 35 +++
.../org/apache/spark/sql/avro/AvroSerializer.scala | 38 +++
15 files changed, 1029 insertions(+), 24 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
index 86061f1cc414..4f6c8e512173 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
@@ -138,6 +138,7 @@ object HoodieSchemaConversionUtils {
try {
HoodieSparkSchemaConverters.toHoodieType(structType, nullable,
structName, recordNamespace)
} catch {
+ case h: HoodieSchemaException => throw h
case a: AvroRuntimeException => throw new
HoodieSchemaException(a.getMessage, a)
case e: Exception => throw new HoodieSchemaException(
s"Failed to convert struct type to HoodieSchema: $structType", e)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
index c0f386089406..a9ea0aee94b5 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.avro
import org.apache.hudi.common.schema.HoodieSchema.TimePrecision
import org.apache.hudi.common.schema.{HoodieJsonProperties, HoodieSchema,
HoodieSchemaField, HoodieSchemaType}
+import org.apache.hudi.internal.schema.HoodieSchemaException
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.types.Decimal.minBytesForPrecision
import org.apache.spark.sql.types._
@@ -44,7 +45,7 @@ object HoodieSparkSchemaConverters {
/**
* Internal wrapper for SQL data type and nullability.
*/
- case class SchemaType(dataType: DataType, nullable: Boolean)
+ case class SchemaType(dataType: DataType, nullable: Boolean, metadata:
Option[Metadata] = None)
def toSqlType(hoodieSchema: HoodieSchema): (DataType, Boolean) = {
val result = toSqlTypeHelper(hoodieSchema, Set.empty)
@@ -79,6 +80,29 @@ object HoodieSparkSchemaConverters {
HoodieSchema.createDecimal(name, nameSpace, null, d.precision,
d.scale, fixedSize)
// Complex types
+ case ArrayType(elementSparkType, containsNull)
+ if metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
+
HoodieSchema.parseTypeDescriptor(metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).getType
== HoodieSchemaType.VECTOR =>
+ if (containsNull) {
+ throw new HoodieSchemaException(
+ s"VECTOR type does not support nullable elements (field:
$recordName)")
+ }
+
+ val vectorSchema = HoodieSchema
+
.parseTypeDescriptor(metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+ .asInstanceOf[HoodieSchema.Vector]
+ val dimension = vectorSchema.getDimension
+
+ val elementType = vectorSchema.getVectorElementType
+
+ val expectedSparkType = sparkTypeForVectorElementType(elementType)
+ if (elementSparkType != expectedSparkType) {
+ throw new HoodieSchemaException(
+ s"VECTOR element type mismatch for field $recordName: metadata
requires $elementType, Spark array has $elementSparkType")
+ }
+
+ HoodieSchema.createVector(dimension, elementType)
+
case ArrayType(elementType, containsNull) =>
val elementSchema = toHoodieType(elementType, containsNull,
recordName, nameSpace, metadata)
HoodieSchema.createArray(elementSchema)
@@ -88,7 +112,7 @@ object HoodieSparkSchemaConverters {
HoodieSchema.createMap(valueSchema)
case blobStruct: StructType if
metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
-
metadata.getString(HoodieSchema.TYPE_METADATA_FIELD).equalsIgnoreCase(HoodieSchemaType.BLOB.name())
=>
+
HoodieSchema.parseTypeDescriptor(metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).getType
== HoodieSchemaType.BLOB =>
// Validate blob structure before accepting
validateBlobStructure(blobStruct)
HoodieSchema.createBlob()
@@ -184,6 +208,15 @@ object HoodieSparkSchemaConverters {
SchemaType(StringType, nullable = false)
// Complex types
+ case HoodieSchemaType.VECTOR =>
+ val vectorSchema = hoodieSchema.asInstanceOf[HoodieSchema.Vector]
+ val metadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD,
vectorSchema.toTypeDescriptor)
+ .build()
+
+ val sparkElementType =
sparkTypeForVectorElementType(vectorSchema.getVectorElementType)
+ SchemaType(ArrayType(sparkElementType, containsNull = false), nullable
= false, Some(metadata))
+
case HoodieSchemaType.BLOB | HoodieSchemaType.RECORD =>
val fullName = hoodieSchema.getFullName
if (existingRecordNames.contains(fullName)) {
@@ -196,25 +229,27 @@ object HoodieSparkSchemaConverters {
val newRecordNames = existingRecordNames + fullName
val fields = hoodieSchema.getFields.asScala.map { f =>
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
- val commentMetadata = if (f.doc().isPresent &&
!f.doc().get().isEmpty) {
- new MetadataBuilder().putString("comment", f.doc().get()).build()
- } else {
- Metadata.empty
- }
val fieldSchema = f.getNonNullSchema
- val metadata = if (fieldSchema.isBlobField) {
- // Mark blob fields with metadata for identification.
- // This assumes blobs are always part of a record and not the top
level schema itself
- new MetadataBuilder()
- .withMetadata(commentMetadata)
- .putString(HoodieSchema.TYPE_METADATA_FIELD,
HoodieSchemaType.BLOB.name())
- .build()
- } else {
- commentMetadata
+ val metadataBuilder = new MetadataBuilder()
+ .withMetadata(schemaType.metadata.getOrElse(Metadata.empty))
+ if (f.doc().isPresent && f.doc().get().nonEmpty) {
+ metadataBuilder.putString("comment", f.doc().get())
+ }
+ if (fieldSchema.isBlobField) {
+ metadataBuilder.putString(HoodieSchema.TYPE_METADATA_FIELD,
HoodieSchema.Blob.TYPE_DESCRIPTOR)
}
+ val metadata = metadataBuilder.build()
StructField(f.name(), schemaType.dataType, schemaType.nullable,
metadata)
}
- SchemaType(StructType(fields.toSeq), nullable = false)
+ // For BLOB types, propagate type metadata via SchemaType
+ val schemaTypeMetadata = if (hoodieSchema.getType ==
HoodieSchemaType.BLOB) {
+ Some(new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD,
hoodieSchema.asInstanceOf[HoodieSchema.Blob].toTypeDescriptor)
+ .build())
+ } else {
+ None
+ }
+ SchemaType(StructType(fields.toSeq), nullable = false,
schemaTypeMetadata)
case HoodieSchemaType.ARRAY =>
val elementSchema = hoodieSchema.getElementType
@@ -286,6 +321,13 @@ object HoodieSparkSchemaConverters {
f.name.matches("member\\d+") && f.nullable
}
}
+
+ private def sparkTypeForVectorElementType(
+ elementType: HoodieSchema.Vector.VectorElementType): DataType =
elementType match {
+ case HoodieSchema.Vector.VectorElementType.FLOAT => FloatType
+ case HoodieSchema.Vector.VectorElementType.DOUBLE => DoubleType
+ case HoodieSchema.Vector.VectorElementType.INT8 => ByteType
+ }
}
private[avro] class IncompatibleSchemaException(msg: String, ex: Throwable =
null) extends Exception(msg, ex)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index f35c77c099f5..4cf23ba17dc8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -35,13 +35,16 @@ import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField;
@@ -84,18 +87,100 @@ import static
org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField;
*/
public class HoodieSchema implements Serializable {
private static final long serialVersionUID = 1L;
+
/**
* Constant representing a null JSON value, equivalent to
JsonProperties.NULL_VALUE.
* This provides compatibility with Avro's JsonProperties while maintaining
Hudi's API.
*/
public static final Object NULL_VALUE = JsonProperties.NULL_VALUE;
public static final HoodieSchema NULL_SCHEMA =
HoodieSchema.create(HoodieSchemaType.NULL);
-
/**
* Constant to use when attaching type metadata to external schema systems
like Spark's StructType.
+ * Stores a parameterized type string for custom Hudi logical types such as
VECTOR and BLOB.
+ * Examples: "VECTOR(128)", "VECTOR(512, DOUBLE)", "BLOB".
*/
public static final String TYPE_METADATA_FIELD = "hudi_type";
+ /**
+ * Parses a type descriptor string for custom Hudi logical types such as
VECTOR and BLOB.
+ * Examples: "VECTOR(128)", "VECTOR(512, DOUBLE)", "BLOB".
+ * Throws for non-custom logical type names.
+ */
+ public static HoodieSchema parseTypeDescriptor(String descriptor) {
+ Pair<HoodieSchemaType, List<String>> parsedDescriptor =
tokenizeTypeDescriptor(descriptor);
+ HoodieSchemaType type = parsedDescriptor.getLeft();
+ List<String> params = parsedDescriptor.getRight();
+ switch (type) {
+ case VECTOR:
+ if (params.isEmpty()) {
+ throw new IllegalArgumentException("VECTOR type descriptor must
include a dimension parameter");
+ }
+ if (params.size() > 2) {
+ throw new IllegalArgumentException(
+ "VECTOR type descriptor supports at most 2 parameters: dimension
and optional element type");
+ }
+ int dimension;
+ try {
+ dimension = Integer.parseInt(params.get(0));
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid VECTOR dimension: " +
params.get(0), e);
+ }
+ Vector.VectorElementType elementType = params.size() > 1
+ ? Vector.VectorElementType.fromString(params.get(1))
+ : Vector.VectorElementType.FLOAT;
+ return createVector(dimension, elementType);
+ case BLOB:
+ if (!params.isEmpty()) {
+ throw new IllegalArgumentException(
+ "BLOB type descriptor does not support parameters, got: " +
params);
+ }
+ return createBlob();
+ default:
+ throw new IllegalArgumentException(
+ "parseTypeDescriptor only supports custom logical types, got: " +
type);
+ }
+ }
+
+ private static Pair<HoodieSchemaType, List<String>>
tokenizeTypeDescriptor(String descriptor) {
+ ValidationUtils.checkArgument(descriptor != null &&
!descriptor.trim().isEmpty(),
+ "Type descriptor cannot be null or empty");
+ int parenStart = descriptor.indexOf('(');
+ String typeName;
+ List<String> params;
+ if (parenStart == -1) {
+ typeName = descriptor.trim();
+ params = Collections.emptyList();
+ } else {
+ if (!descriptor.endsWith(")")) {
+ throw new IllegalArgumentException("Malformed type descriptor, missing
closing ')': " + descriptor);
+ }
+ typeName = descriptor.substring(0, parenStart).trim();
+ String paramStr = descriptor.substring(parenStart + 1,
descriptor.length() - 1).trim();
+ if (paramStr.isEmpty()) {
+ params = Collections.emptyList();
+ } else {
+ params = Arrays.stream(paramStr.split(","))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ }
+ }
+ HoodieSchemaType type;
+ try {
+ type = HoodieSchemaType.valueOf(typeName.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Unknown Hudi schema type: " +
typeName, e);
+ }
+ if (!CUSTOM_LOGICAL_TYPES.contains(type)) {
+ throw new IllegalArgumentException(
+ "parseTypeDescriptor only supports custom logical types, got: " +
type);
+ }
+ return Pair.of(type, params);
+ }
+
+ private static final Set<HoodieSchemaType> CUSTOM_LOGICAL_TYPES =
+ EnumSet.of(HoodieSchemaType.VECTOR, HoodieSchemaType.BLOB);
+
+
/**
* Constants for Parquet-style accessor patterns used in nested MAP and
ARRAY navigation.
* These patterns are specifically used for column stats generation and
differ from
@@ -1610,6 +1695,21 @@ public class HoodieSchema implements Serializable {
}
}
+ /**
+ * Represents a fixed-dimension, typed-element vector stored as an Avro
FIXED field.
+ *
+ * <p>Each element is serialized contiguously in little-endian byte order
+ * ({@link VectorLogicalType#VECTOR_BYTE_ORDER}).
+ *
+ * <p>Supported element types and their per-element byte sizes:
+ * <ul>
+ * <li>{@link VectorElementType#FLOAT} — 4 bytes (IEEE 754
single-precision)</li>
+ * <li>{@link VectorElementType#DOUBLE} — 8 bytes (IEEE 754
double-precision)</li>
+ * <li>{@link VectorElementType#INT8} — 1 byte (signed 8-bit
integer)</li>
+ * </ul>
+ *
+ * <p>The total FIXED size is {@code dimension *
elementType.getElementSize()} bytes.
+ */
public static class Vector extends HoodieSchema {
private static final String DEFAULT_NAME = "vector";
@@ -1715,6 +1815,17 @@ public class HoodieSchema implements Serializable {
return HoodieSchemaType.VECTOR;
}
+ /**
+ * Returns the type descriptor string for this vector, e.g. "VECTOR(128)"
or "VECTOR(512, DOUBLE)".
+ * Default element type (FLOAT) is omitted.
+ */
+ public String toTypeDescriptor() {
+ if (getVectorElementType() == VectorElementType.FLOAT) {
+ return "VECTOR(" + getDimension() + ")";
+ }
+ return "VECTOR(" + getDimension() + ", " + getVectorElementType() + ")";
+ }
+
/**
* Creates vector schema with specified dimension and element type.
*
@@ -1987,12 +2098,18 @@ public class HoodieSchema implements Serializable {
}
}
- static class VectorLogicalType extends LogicalType {
+ public static class VectorLogicalType extends LogicalType {
private static final String VECTOR_LOGICAL_TYPE_NAME = "vector";
private static final String PROP_DIMENSION = "dimension";
private static final String PROP_ELEMENT_TYPE = "elementType";
private static final String PROP_STORAGE_BACKING = "storageBacking";
+ /**
+ * Byte order used for serializing/deserializing vector elements (FLOAT,
DOUBLE, etc).
+ * Little-endian is chosen for compatibility with file formats.
+ */
+ public static final ByteOrder VECTOR_BYTE_ORDER = ByteOrder.LITTLE_ENDIAN;
+
private final int dimension;
private final String elementType;
private final String storageBacking;
@@ -2329,6 +2446,7 @@ public class HoodieSchema implements Serializable {
* Blob types represent raw binary data. The data can be stored in-line as a
byte array or out-of-line as a reference to a file or offset and length within
that file.
*/
public static class Blob extends HoodieSchema {
+ public static final String TYPE_DESCRIPTOR = "BLOB";
private static final String DEFAULT_NAME = "blob";
private static final List<Schema.Field> BLOB_FIELDS = createBlobFields();
@@ -2374,6 +2492,13 @@ public class HoodieSchema implements Serializable {
return HoodieSchemaType.BLOB;
}
+ /**
+ * Returns the type descriptor string for Blob: "BLOB".
+ */
+ public String toTypeDescriptor() {
+ return TYPE_DESCRIPTOR;
+ }
+
private static Schema createSchema(String name) {
Schema blobSchema = Schema.createRecord(name, null, null, false);
// each instance requires its own copy of the fields list
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
index 4851d33d1ee6..1032e228034d 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
@@ -991,7 +991,7 @@ public class TestHoodieSchema {
assertEquals(1536 * 4, vectorFloat.getFixedSize()); // FLOAT is 4 bytes
assertEquals(768 * 8, vectorDouble.getFixedSize()); // DOUBLE is 8 bytes
}
-
+
@Test
void testVectorEquality() {
HoodieSchema.Vector v1 = HoodieSchema.createVector(1536);
@@ -1018,7 +1018,7 @@ public class TestHoodieSchema {
// Different class
assertNotEquals(v1, "string");
}
-
+
@Test
void testVectorSerialization() throws Exception {
// Create vector with DOUBLE element type
@@ -2231,4 +2231,121 @@ public class TestHoodieSchema {
assertFalse(HoodieSchema.createArray(createRecordWithBlob()).isBlobField());
assertFalse(HoodieSchema.createMap(createRecordWithBlob()).isBlobField());
}
+
+ @Test
+ public void testToTypeDescriptorVectorDefaultElementType() {
+ HoodieSchema.Vector vector = HoodieSchema.createVector(128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ assertEquals("VECTOR(128)", vector.toTypeDescriptor());
+ }
+
+ @Test
+ public void testToTypeDescriptorVectorNonDefaultElementType() {
+ HoodieSchema.Vector vector = HoodieSchema.createVector(512,
HoodieSchema.Vector.VectorElementType.DOUBLE);
+ assertEquals("VECTOR(512, DOUBLE)", vector.toTypeDescriptor());
+ }
+
+ @Test
+ public void testToTypeDescriptorVectorInt8RoundTrip() {
+ HoodieSchema.Vector vector = HoodieSchema.createVector(32,
HoodieSchema.Vector.VectorElementType.INT8);
+ assertEquals("VECTOR(32, INT8)", vector.toTypeDescriptor());
+
+ HoodieSchema parsed = HoodieSchema.parseTypeDescriptor("VECTOR(32, INT8)");
+ assertEquals(HoodieSchemaType.VECTOR, parsed.getType());
+ HoodieSchema.Vector parsedVector = (HoodieSchema.Vector) parsed;
+ assertEquals(32, parsedVector.getDimension());
+ assertEquals(HoodieSchema.Vector.VectorElementType.INT8,
parsedVector.getVectorElementType());
+ }
+
+ @Test
+ public void testParseTypeDescriptorVector() {
+ HoodieSchema parsed = HoodieSchema.parseTypeDescriptor("VECTOR(128)");
+ assertEquals(HoodieSchemaType.VECTOR, parsed.getType());
+ HoodieSchema.Vector vector = (HoodieSchema.Vector) parsed;
+ assertEquals(128, vector.getDimension());
+ assertEquals(HoodieSchema.Vector.VectorElementType.FLOAT,
vector.getVectorElementType());
+ }
+
+ @Test
+ public void testParseTypeDescriptorVectorWithElementType() {
+ HoodieSchema parsed = HoodieSchema.parseTypeDescriptor("VECTOR(512,
DOUBLE)");
+ assertEquals(HoodieSchemaType.VECTOR, parsed.getType());
+ HoodieSchema.Vector vector = (HoodieSchema.Vector) parsed;
+ assertEquals(512, vector.getDimension());
+ assertEquals(HoodieSchema.Vector.VectorElementType.DOUBLE,
vector.getVectorElementType());
+ }
+
+ @Test
+ public void testParseTypeDescriptorThrowsForNonCustomType() {
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("INT"));
+ }
+
+ @Test
+ public void testTypeDescriptorRoundTrip() {
+ HoodieSchema.Vector vector = HoodieSchema.createVector(256,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ String typeString = vector.toTypeDescriptor();
+ HoodieSchema parsed = HoodieSchema.parseTypeDescriptor(typeString);
+ HoodieSchema.Vector parsedVector = (HoodieSchema.Vector) parsed;
+
+ assertEquals(HoodieSchemaType.VECTOR, parsed.getType());
+ assertEquals(256, parsedVector.getDimension());
+ assertEquals(HoodieSchema.Vector.VectorElementType.FLOAT,
parsedVector.getVectorElementType());
+
+ // Non-default element type round-trip
+ HoodieSchema.Vector vectorDouble = HoodieSchema.createVector(64,
HoodieSchema.Vector.VectorElementType.DOUBLE);
+ String typeStringDouble = vectorDouble.toTypeDescriptor();
+ HoodieSchema parsedDouble =
HoodieSchema.parseTypeDescriptor(typeStringDouble);
+ HoodieSchema.Vector parsedDoubleVector = (HoodieSchema.Vector)
parsedDouble;
+
+ assertEquals(HoodieSchemaType.VECTOR, parsedDouble.getType());
+ assertEquals(64, parsedDoubleVector.getDimension());
+ assertEquals(HoodieSchema.Vector.VectorElementType.DOUBLE,
parsedDoubleVector.getVectorElementType());
+ }
+
+ @Test
+ public void testParseTypeDescriptorMalformed() {
+ // Null and empty inputs
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor(null));
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor(""));
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor(" "));
+
+ // Missing closing parenthesis
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR(128"));
+
+ // Non-numeric dimension
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR(abc)"));
+
+ // Missing required dimension parameter
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR"));
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR()"));
+
+ // Too many parameters
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR(128, FLOAT, extra)"));
+
+ // Invalid element type
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR(128, INVALID)"));
+
+ // Zero and negative dimensions
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR(0)"));
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR(-1)"));
+
+ // BLOB does not accept parameters
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("BLOB(1024)"));
+ }
+
+ @Test
+ public void testParseTypeDescriptorUnknownType() {
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("UNKNOWN_TYPE"));
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("FOOBAR(123)"));
+ }
+
+ @Test
+ public void testBlobTypeDescriptorRoundTrip() {
+ HoodieSchema.Blob blob = HoodieSchema.createBlob();
+ String typeString = blob.toTypeDescriptor();
+ assertEquals("BLOB", typeString);
+
+ HoodieSchema parsed = HoodieSchema.parseTypeDescriptor(typeString);
+ assertEquals(HoodieSchemaType.BLOB, parsed.getType());
+ assertInstanceOf(HoodieSchema.Blob.class, parsed);
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala
index 3b82ac4a2449..389b8ad03a59 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala
@@ -570,6 +570,290 @@ class TestHoodieSchemaConversionUtils extends FunSuite
with Matchers {
internalRowCompare(row1, row2, sparkSchema)
}
+ test("test VECTOR type conversion - Spark to HoodieSchema") {
+ val metadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(128)")
+ .build()
+ val struct = new StructType()
+ .add("id", IntegerType, false)
+ .add("embedding", ArrayType(FloatType, containsNull = false), nullable =
false, metadata)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "VectorTest", "test")
+
+ // Verify the embedding field is VECTOR type
+ val embeddingField = hoodieSchema.getField("embedding").get()
+ assert(embeddingField.schema().getType == HoodieSchemaType.VECTOR)
+ assert(embeddingField.schema().isInstanceOf[HoodieSchema.Vector])
+
+ val vectorSchema =
embeddingField.schema().asInstanceOf[HoodieSchema.Vector]
+ assert(vectorSchema.getDimension == 128)
+ assert(vectorSchema.getVectorElementType ==
HoodieSchema.Vector.VectorElementType.FLOAT)
+ assert(!embeddingField.isNullable())
+ }
+
+ test("test VECTOR type conversion - Spark to HoodieSchema for DOUBLE and
INT8") {
+ val doubleMetadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(64, DOUBLE)")
+ .build()
+ val int8Metadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(32, INT8)")
+ .build()
+
+ val struct = new StructType()
+ .add("embedding_double", ArrayType(DoubleType, containsNull = false),
nullable = false, doubleMetadata)
+ .add("embedding_int8", ArrayType(ByteType, containsNull = false),
nullable = false, int8Metadata)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "VectorTypedTest", "test")
+
+ val doubleField = hoodieSchema.getField("embedding_double").get()
+ assert(doubleField.schema().getType == HoodieSchemaType.VECTOR)
+ val doubleVector = doubleField.schema().asInstanceOf[HoodieSchema.Vector]
+ assert(doubleVector.getDimension == 64)
+ assert(doubleVector.getVectorElementType ==
HoodieSchema.Vector.VectorElementType.DOUBLE)
+ assert(!doubleField.isNullable())
+
+ val int8Field = hoodieSchema.getField("embedding_int8").get()
+ assert(int8Field.schema().getType == HoodieSchemaType.VECTOR)
+ val int8Vector = int8Field.schema().asInstanceOf[HoodieSchema.Vector]
+ assert(int8Vector.getDimension == 32)
+ assert(int8Vector.getVectorElementType ==
HoodieSchema.Vector.VectorElementType.INT8)
+ assert(!int8Field.isNullable())
+ }
+
+ test("test VECTOR type conversion - HoodieSchema to Spark") {
+ val vectorSchema = HoodieSchema.createVector(256,
HoodieSchema.Vector.VectorElementType.FLOAT)
+ val fields = java.util.Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("embedding", vectorSchema)
+ )
+ val hoodieSchema = HoodieSchema.createRecord("VectorTest", "test", null,
fields)
+
+ val structType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+ // Verify the embedding field is ArrayType(FloatType)
+ assert(structType.fields.length == 2)
+ val embeddingField = structType.fields(1)
+ assert(embeddingField.name == "embedding")
+ assert(embeddingField.dataType.isInstanceOf[ArrayType])
+ assert(!embeddingField.nullable)
+
+ val arrayType = embeddingField.dataType.asInstanceOf[ArrayType]
+ assert(arrayType.elementType == FloatType)
+ assert(!arrayType.containsNull)
+
+ // Verify metadata contains type descriptor with dimension
+ assert(embeddingField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ val parsedVector = HoodieSchema.parseTypeDescriptor(
+
embeddingField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+ assert(parsedVector.getType == HoodieSchemaType.VECTOR)
+ assert(parsedVector.getDimension == 256)
+ }
+
+ test("test VECTOR type conversion - HoodieSchema to Spark for DOUBLE and
INT8") {
+ val fields = java.util.Arrays.asList(
+ HoodieSchemaField.of("embedding_double", HoodieSchema.createVector(64,
HoodieSchema.Vector.VectorElementType.DOUBLE)),
+ HoodieSchemaField.of("embedding_int8", HoodieSchema.createVector(32,
HoodieSchema.Vector.VectorElementType.INT8))
+ )
+ val hoodieSchema = HoodieSchema.createRecord("VectorTypedTest", "test",
null, fields)
+
+ val structType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+ val doubleField = structType.fields(0)
+ assert(doubleField.dataType == ArrayType(DoubleType, containsNull = false))
+ assert(doubleField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ val parsedDoubleVector = HoodieSchema.parseTypeDescriptor(
+
doubleField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+ assert(parsedDoubleVector.getType == HoodieSchemaType.VECTOR)
+ assert(parsedDoubleVector.getDimension == 64)
+ assert(parsedDoubleVector.getVectorElementType ==
HoodieSchema.Vector.VectorElementType.DOUBLE)
+
+ val int8Field = structType.fields(1)
+ assert(int8Field.dataType == ArrayType(ByteType, containsNull = false))
+ assert(int8Field.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ val parsedInt8Vector = HoodieSchema.parseTypeDescriptor(
+
int8Field.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+ assert(parsedInt8Vector.getType == HoodieSchemaType.VECTOR)
+ assert(parsedInt8Vector.getDimension == 32)
+ assert(parsedInt8Vector.getVectorElementType ==
HoodieSchema.Vector.VectorElementType.INT8)
+ }
+
+ test("test VECTOR round-trip conversion - Spark to HoodieSchema to Spark") {
+ val metadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(512)")
+ .build()
+ val originalStruct = new StructType()
+ .add("id", LongType, false)
+ .add("vector_field", ArrayType(FloatType, containsNull = false),
nullable = false, metadata)
+ .add("name", StringType, true)
+
+ // Convert Spark -> HoodieSchema -> Spark
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ originalStruct, "RoundTripTest", "test")
+ val convertedStruct =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+ // Verify structure is preserved
+ assert(convertedStruct.fields.length == originalStruct.fields.length)
+
+ // Verify vector field properties
+ val originalVectorField = originalStruct.fields(1)
+ val convertedVectorField = convertedStruct.fields(1)
+
+ assert(convertedVectorField.name == originalVectorField.name)
+ assert(convertedVectorField.dataType == originalVectorField.dataType)
+ assert(convertedVectorField.nullable == originalVectorField.nullable)
+
+ // Verify metadata is preserved
+
assert(convertedVectorField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ val roundTripVector = HoodieSchema.parseTypeDescriptor(
+
convertedVectorField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+ assert(roundTripVector.getType == HoodieSchemaType.VECTOR)
+ assert(roundTripVector.getDimension == 512)
+
+ // Verify array properties
+ val convertedArrayType =
convertedVectorField.dataType.asInstanceOf[ArrayType]
+ assert(convertedArrayType.elementType == FloatType)
+ assert(!convertedArrayType.containsNull)
+ }
+
+ test("test VECTOR round-trip conversion - Spark to HoodieSchema to Spark for
DOUBLE and INT8") {
+ val doubleMetadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(64, DOUBLE)")
+ .build()
+ val int8Metadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(32, INT8)")
+ .build()
+ val originalStruct = new StructType()
+ .add("id", LongType, false)
+ .add("vector_double", ArrayType(DoubleType, containsNull = false),
nullable = false, doubleMetadata)
+ .add("vector_int8", ArrayType(ByteType, containsNull = false), nullable
= false, int8Metadata)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ originalStruct, "VectorTypedRoundTripTest", "test")
+ val convertedStruct =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+ val originalDouble = originalStruct.fields(1)
+ val convertedDouble = convertedStruct.fields(1)
+ assert(convertedDouble.name == originalDouble.name)
+ assert(convertedDouble.dataType == originalDouble.dataType)
+ assert(convertedDouble.nullable == originalDouble.nullable)
+ val convertedDoubleVector = HoodieSchema.parseTypeDescriptor(
+
convertedDouble.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+ assert(convertedDoubleVector.getType == HoodieSchemaType.VECTOR)
+ assert(convertedDoubleVector.getDimension == 64)
+ assert(convertedDoubleVector.getVectorElementType ==
HoodieSchema.Vector.VectorElementType.DOUBLE)
+
+ val originalInt8 = originalStruct.fields(2)
+ val convertedInt8 = convertedStruct.fields(2)
+ assert(convertedInt8.name == originalInt8.name)
+ assert(convertedInt8.dataType == originalInt8.dataType)
+ assert(convertedInt8.nullable == originalInt8.nullable)
+ val convertedInt8Vector = HoodieSchema.parseTypeDescriptor(
+
convertedInt8.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+ assert(convertedInt8Vector.getType == HoodieSchemaType.VECTOR)
+ assert(convertedInt8Vector.getDimension == 32)
+ assert(convertedInt8Vector.getVectorElementType ==
HoodieSchema.Vector.VectorElementType.INT8)
+ }
+
+ test("test VECTOR round-trip conversion - HoodieSchema to Spark to
HoodieSchema") {
+ val originalVectorSchema = HoodieSchema.createVector(1024,
HoodieSchema.Vector.VectorElementType.FLOAT)
+ val fields = java.util.Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.LONG)),
+ HoodieSchemaField.of("embedding", originalVectorSchema),
+ HoodieSchemaField.of("metadata",
HoodieSchema.createNullable(HoodieSchemaType.STRING))
+ )
+ val originalHoodieSchema = HoodieSchema.createRecord("RoundTripTest",
"test", null, fields)
+
+ // Convert HoodieSchema -> Spark -> HoodieSchema
+ val sparkStruct =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(originalHoodieSchema)
+ val convertedHoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ sparkStruct, "RoundTripTest", "test")
+
+ // Verify the vector field is preserved
+ val convertedEmbeddingField =
convertedHoodieSchema.getField("embedding").get()
+ assert(convertedEmbeddingField.schema().getType == HoodieSchemaType.VECTOR)
+ assert(convertedEmbeddingField.schema().isInstanceOf[HoodieSchema.Vector])
+
+ val convertedVectorSchema =
convertedEmbeddingField.schema().asInstanceOf[HoodieSchema.Vector]
+ assert(convertedVectorSchema.getDimension == 1024)
+ assert(convertedVectorSchema.getVectorElementType ==
HoodieSchema.Vector.VectorElementType.FLOAT)
+ assert(!convertedEmbeddingField.isNullable())
+
+ // Verify other fields are preserved
+ assert(convertedHoodieSchema.getFields.size() == 3)
+ assert(convertedHoodieSchema.getField("id").get().schema().getType ==
HoodieSchemaType.LONG)
+ assert(convertedHoodieSchema.getField("metadata").get().isNullable())
+ }
+
+ test("test nullable VECTOR round-trip conversion") {
+ val metadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(64)")
+ .build()
+ val originalStruct = new StructType()
+ .add("id", LongType, false)
+ .add("embedding", ArrayType(FloatType, containsNull = false), nullable =
true, metadata)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ originalStruct, "NullableVectorTest", "test")
+
+ // Verify the vector field is nullable in HoodieSchema
+ val embeddingField = hoodieSchema.getField("embedding").get()
+ assert(embeddingField.isNullable())
+ val vectorSchema =
embeddingField.schema().getNonNullType().asInstanceOf[HoodieSchema.Vector]
+ assert(vectorSchema.getDimension == 64)
+ assert(vectorSchema.getVectorElementType ==
HoodieSchema.Vector.VectorElementType.FLOAT)
+
+ // Convert back to Spark
+ val convertedStruct =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+ val convertedField = convertedStruct.fields(1)
+ assert(convertedField.nullable)
+ assert(convertedField.dataType == ArrayType(FloatType, containsNull =
false))
+ assert(convertedField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ val parsedVector = HoodieSchema.parseTypeDescriptor(
+
convertedField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+ assert(parsedVector.getDimension == 64)
+ }
+
+ test("test nullable VECTOR via UNION preserves metadata in HoodieSchema to
Spark conversion") {
+ val vectorSchema = HoodieSchema.createVector(256,
HoodieSchema.Vector.VectorElementType.FLOAT)
+ val nullableVectorSchema = HoodieSchema.createNullable(vectorSchema)
+ val fields = java.util.Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("embedding", nullableVectorSchema)
+ )
+ val hoodieSchema = HoodieSchema.createRecord("NullableVectorUnionTest",
"test", null, fields)
+
+ val structType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+ val embeddingField = structType.fields(1)
+ assert(embeddingField.name == "embedding")
+ assert(embeddingField.nullable)
+ assert(embeddingField.dataType == ArrayType(FloatType, containsNull =
false))
+
+ // The key assertion: metadata must survive the UNION unwrapping
(.copy(nullable = true))
+ assert(embeddingField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ val parsedVector = HoodieSchema.parseTypeDescriptor(
+
embeddingField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+ assert(parsedVector.getDimension == 256)
+ assert(parsedVector.getVectorElementType ==
HoodieSchema.Vector.VectorElementType.FLOAT)
+ }
+
+ test("test VECTOR element type mismatch throws error") {
+ // Metadata says DOUBLE, but Spark array element type is Float
+ val mismatchMetadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(128, DOUBLE)")
+ .build()
+ val struct = new StructType()
+ .add("embedding", ArrayType(FloatType, containsNull = false), nullable =
false, mismatchMetadata)
+
+ the[Exception] thrownBy {
+ HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "MismatchTest", "test")
+ } should have message (
+ "VECTOR element type mismatch for field embedding: metadata requires
DOUBLE, Spark array has FloatType")
+ }
+
private def internalRowCompare(expected: Any, actual: Any, schema:
DataType): Unit = {
schema match {
case StructType(fields) =>
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala
index 31367060e5dc..bea088348f6b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala
@@ -19,12 +19,18 @@ package org.apache.spark.sql.avro
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, IntWrapper}
-import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaField,
HoodieSchemaType}
import org.apache.avro.generic.GenericData
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals}
import org.junit.jupiter.api.Test
+import java.nio.{ByteBuffer, ByteOrder}
+import java.util
+
class TestAvroSerDe extends SparkAdapterSupport {
@Test
@@ -59,4 +65,130 @@ class TestAvroSerDe extends SparkAdapterSupport {
assertEquals(originalAvroRecord, deserializedAvroRecord)
}
+
+ @Test
+ def testVectorFloatSerDe(): Unit = {
+ val dimension = 4
+ val vectorSchema = HoodieSchema.createVector(dimension,
HoodieSchema.Vector.VectorElementType.FLOAT)
+ val fields = java.util.Arrays.asList(
+ HoodieSchemaField.of("embedding", vectorSchema)
+ )
+ val hoodieSchema = HoodieSchema.createRecord("FloatVectorRecord", "test",
null, fields)
+ val avroSchema = hoodieSchema.toAvroSchema
+
+ // Build a GenericData.Record with float vector data
+ val buffer = ByteBuffer.allocate(dimension *
4).order(ByteOrder.LITTLE_ENDIAN)
+ buffer.putFloat(1.0f).putFloat(2.5f).putFloat(-3.0f).putFloat(0.0f)
+ val fixedField = new
GenericData.Fixed(avroSchema.getField("embedding").schema(), buffer.array())
+
+ val originalRecord = new GenericData.Record(avroSchema)
+ originalRecord.put("embedding", fixedField)
+
+ val (catalystSchema, _) =
HoodieSparkSchemaConverters.toSqlType(hoodieSchema)
+ val deserializer = sparkAdapter.createAvroDeserializer(hoodieSchema,
catalystSchema)
+ val serializer = sparkAdapter.createAvroSerializer(catalystSchema,
hoodieSchema, nullable = false)
+
+ val row = deserializer.deserialize(originalRecord).get
+ val deserializedRecord = serializer.serialize(row)
+
+ assertEquals(originalRecord, deserializedRecord)
+ }
+
+ @Test
+ def testVectorDoubleSerDe(): Unit = {
+ val dimension = 3
+ val vectorSchema = HoodieSchema.createVector(dimension,
HoodieSchema.Vector.VectorElementType.DOUBLE)
+ val fields = java.util.Arrays.asList(
+ HoodieSchemaField.of("embedding", vectorSchema)
+ )
+ val hoodieSchema = HoodieSchema.createRecord("DoubleVectorRecord", "test",
null, fields)
+ val avroSchema = hoodieSchema.toAvroSchema
+
+ val buffer = ByteBuffer.allocate(dimension *
8).order(ByteOrder.LITTLE_ENDIAN)
+ buffer.putDouble(1.0).putDouble(-2.5).putDouble(3.14159)
+ val fixedField = new
GenericData.Fixed(avroSchema.getField("embedding").schema(), buffer.array())
+
+ val originalRecord = new GenericData.Record(avroSchema)
+ originalRecord.put("embedding", fixedField)
+
+ val (catalystSchema, _) =
HoodieSparkSchemaConverters.toSqlType(hoodieSchema)
+ val deserializer = sparkAdapter.createAvroDeserializer(hoodieSchema,
catalystSchema)
+ val serializer = sparkAdapter.createAvroSerializer(catalystSchema,
hoodieSchema, nullable = false)
+
+ val row = deserializer.deserialize(originalRecord).get
+ val deserializedRecord = serializer.serialize(row)
+
+ assertEquals(originalRecord, deserializedRecord)
+ }
+
+ @Test
+ def testVectorFloatByteOrder(): Unit = {
+ val dimension = 2
+ val vectorSchema = HoodieSchema.createVector(dimension,
HoodieSchema.Vector.VectorElementType.FLOAT)
+ val fields = util.Arrays.asList(
+ HoodieSchemaField.of("embedding", vectorSchema)
+ )
+ val hoodieSchema = HoodieSchema.createRecord("FloatByteOrderRecord",
"test", null, fields)
+ val avroSchema = hoodieSchema.toAvroSchema
+
+ // Build input: Spark array of floats → serialize → Avro Fixed
+ val inputFloats = Array(1.0f, -2.0f)
+ val sparkRow = {
+ val row = new GenericInternalRow(1)
+ row.update(0, ArrayData.toArrayData(inputFloats))
+ row
+ }
+
+ val (catalystSchema, _) =
HoodieSparkSchemaConverters.toSqlType(hoodieSchema)
+ val serializer = sparkAdapter.createAvroSerializer(catalystSchema,
hoodieSchema, nullable = false)
+ val serialized =
serializer.serialize(sparkRow).asInstanceOf[GenericData.Record]
+ val fixedBytes =
serialized.get("embedding").asInstanceOf[GenericData.Fixed].bytes()
+
+ // Assert little-endian IEEE 754 layout byte-by-byte:
+ // 1.0f LE = 0x00 0x00 0x80 0x3F
+ // -2.0f LE = 0x00 0x00 0x00 0xC0
+ val expectedBytes = Array[Byte](
+ 0x00, 0x00, 0x80.toByte, 0x3F,
+ 0x00, 0x00, 0x00, 0xC0.toByte
+ )
+ assertArrayEquals(expectedBytes, fixedBytes,
+ "Serialized bytes must match little-endian IEEE 754 float layout")
+
+ // Deserialize the raw bytes back and verify float values
+ val fixedField = new
GenericData.Fixed(avroSchema.getField("embedding").schema(), fixedBytes)
+ val record = new GenericData.Record(avroSchema)
+ record.put("embedding", fixedField)
+
+ val deserializer = sparkAdapter.createAvroDeserializer(hoodieSchema,
catalystSchema)
+ val row = deserializer.deserialize(record).get.asInstanceOf[InternalRow]
+ val resultArray = row.getArray(0)
+ assertEquals(1.0f, resultArray.getFloat(0), 0.0f)
+ assertEquals(-2.0f, resultArray.getFloat(1), 0.0f)
+ }
+
+ @Test
+ def testVectorInt8SerDe(): Unit = {
+ val dimension = 5
+ val vectorSchema = HoodieSchema.createVector(dimension,
HoodieSchema.Vector.VectorElementType.INT8)
+ val fields = java.util.Arrays.asList(
+ HoodieSchemaField.of("embedding", vectorSchema)
+ )
+ val hoodieSchema = HoodieSchema.createRecord("Int8VectorRecord", "test",
null, fields)
+ val avroSchema = hoodieSchema.toAvroSchema
+
+ val bytes = Array[Byte](1, -2, 127, -128, 0)
+ val fixedField = new
GenericData.Fixed(avroSchema.getField("embedding").schema(), bytes)
+
+ val originalRecord = new GenericData.Record(avroSchema)
+ originalRecord.put("embedding", fixedField)
+
+ val (catalystSchema, _) =
HoodieSparkSchemaConverters.toSqlType(hoodieSchema)
+ val deserializer = sparkAdapter.createAvroDeserializer(hoodieSchema,
catalystSchema)
+ val serializer = sparkAdapter.createAvroSerializer(catalystSchema,
hoodieSchema, nullable = false)
+
+ val row = deserializer.deserialize(originalRecord).get
+ val deserializedRecord = serializer.serialize(row)
+
+ assertEquals(originalRecord, deserializedRecord)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
index 038dcafab18b..7b2dfdbfbab1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
@@ -19,9 +19,10 @@ package org.apache.spark.sql.avro
import org.apache.hudi.avro.model.HoodieMetadataColumnStats
import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaField,
HoodieSchemaType}
+import org.apache.hudi.internal.schema.HoodieSchemaException
import org.apache.avro.JsonProperties
-import org.apache.spark.sql.types.{DataTypes, MetadataBuilder, StructField,
StructType}
+import org.apache.spark.sql.types.{ArrayType, DataTypes, FloatType,
MetadataBuilder, StructField, StructType}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertThrows, assertTrue}
import org.junit.jupiter.api.Test
@@ -181,6 +182,17 @@ class TestSchemaConverters {
assertEquals(HoodieSchemaType.BLOB,
nestedMapType.getField("nested_blob").get.schema.getType)
}
+ @Test
+ def testVectorContainsNullThrows(): Unit = {
+ val metadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(4)")
+ .build()
+ val sparkType = ArrayType(FloatType, containsNull = true)
+ assertThrows(classOf[HoodieSchemaException], () => {
+ HoodieSparkSchemaConverters.toHoodieType(sparkType, nullable = false,
metadata = metadata)
+ })
+ }
+
/**
* Validates the content of the blob fields to ensure the fields match our
expectations.
* @param dataType the StructType containing the blob fields to validate
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 6466d02e3d0f..b5eba6be24cd 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.avro
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{LocalTimestampMicros,
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -36,6 +39,7 @@ import org.apache.spark.unsafe.types.UTF8String
import java.math.BigDecimal
import java.nio.ByteBuffer
+import java.nio.ByteOrder
import java.util.TimeZone
import scala.collection.JavaConverters._
@@ -164,6 +168,37 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
s"Avro logical type $other cannot be converted to SQL type
${TimestampNTZType.sql}.")
}
+ // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+ case (FIXED, ArrayType(elementType, false)) => avroType.getLogicalType
match {
+ case vectorLogicalType: VectorLogicalType =>
+ val dimension = vectorLogicalType.getDimension
+ val vecElementType =
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+ val elementSize = vecElementType.getElementSize
+ (updater, ordinal, value) => {
+ val bytes = value.asInstanceOf[GenericData.Fixed].bytes()
+ val expectedSize = Math.multiplyExact(dimension, elementSize)
+ if (bytes.length != expectedSize) {
+ throw new IncompatibleSchemaException(
+ s"VECTOR byte size mismatch: expected=$expectedSize,
actual=${bytes.length}")
+ }
+ elementType match {
+ case FloatType =>
+ val buffer =
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ val floats = new Array[Float](dimension)
+ var i = 0; while (i < dimension) { floats(i) =
buffer.getFloat(); i += 1 }
+ updater.set(ordinal, ArrayData.toArrayData(floats))
+ case DoubleType =>
+ val buffer =
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ val doubles = new Array[Double](dimension)
+ var i = 0; while (i < dimension) { doubles(i) =
buffer.getDouble(); i += 1 }
+ updater.set(ordinal, ArrayData.toArrayData(doubles))
+ case ByteType =>
+ updater.set(ordinal, ArrayData.toArrayData(bytes.clone()))
+ }
+ }
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+
// Before we upgrade Avro to 1.8 for logical type support, spark-avro
converts Long to Date.
// For backward compatibility, we still keep this conversion.
case (LONG, DateType) => (updater, ordinal, value) =>
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 874b90f88ae8..a1241b72e58b 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.avro
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{LocalTimestampMicros,
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -36,6 +39,7 @@ import
org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._
import java.nio.ByteBuffer
+import java.nio.ByteOrder
import java.util.TimeZone
import scala.collection.JavaConverters._
@@ -145,6 +149,40 @@ private[sql] class AvroSerializer(rootCatalystType:
DataType,
decimalConversions.toBytes(decimal.toJavaBigDecimal, avroType,
LogicalTypes.decimal(d.precision, d.scale))
+ // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+ case (ArrayType(elementType, false), FIXED) => avroType.getLogicalType
match {
+ case vectorLogicalType: VectorLogicalType =>
+ val dimension = vectorLogicalType.getDimension
+ val vecElementType =
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+ val bufferSize = Math.multiplyExact(dimension,
vecElementType.getElementSize)
+ (getter, ordinal) => {
+ val arrayData = getter.getArray(ordinal)
+ if (arrayData.numElements() != dimension) {
+ throw new IncompatibleSchemaException(
+ s"VECTOR dimension mismatch at ${toFieldStr(catalystPath)}: " +
+ s"expected=$dimension, actual=${arrayData.numElements()}")
+ }
+ elementType match {
+ case FloatType =>
+ val buffer =
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ var i = 0; while (i < dimension) {
buffer.putFloat(arrayData.getFloat(i)); i += 1 }
+ new Fixed(avroType, buffer.array())
+ case DoubleType =>
+ val buffer =
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ var i = 0; while (i < dimension) {
buffer.putDouble(arrayData.getDouble(i)); i += 1 }
+ new Fixed(avroType, buffer.array())
+ case ByteType =>
+ val bytes = new Array[Byte](dimension)
+ var i = 0; while (i < dimension) { bytes(i) =
arrayData.getByte(i); i += 1 }
+ new Fixed(avroType, bytes)
+ case _ => throw new IncompatibleSchemaException(errorPrefix +
+ s"schema is incompatible (sqlType = ${catalystType.sql},
avroType = $avroType)")
+ }
+ }
+ case _ => throw new IncompatibleSchemaException(errorPrefix +
+ s"schema is incompatible (sqlType = ${catalystType.sql}, avroType =
$avroType)")
+ }
+
case (StringType, ENUM) =>
val enumSymbols: Set[String] = avroType.getEnumSymbols.asScala.toSet
(getter, ordinal) =>
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 6466d02e3d0f..b5eba6be24cd 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.avro
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{LocalTimestampMicros,
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -36,6 +39,7 @@ import org.apache.spark.unsafe.types.UTF8String
import java.math.BigDecimal
import java.nio.ByteBuffer
+import java.nio.ByteOrder
import java.util.TimeZone
import scala.collection.JavaConverters._
@@ -164,6 +168,37 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
s"Avro logical type $other cannot be converted to SQL type
${TimestampNTZType.sql}.")
}
+ // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+ case (FIXED, ArrayType(elementType, false)) => avroType.getLogicalType
match {
+ case vectorLogicalType: VectorLogicalType =>
+ val dimension = vectorLogicalType.getDimension
+ val vecElementType =
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+ val elementSize = vecElementType.getElementSize
+ (updater, ordinal, value) => {
+ val bytes = value.asInstanceOf[GenericData.Fixed].bytes()
+ val expectedSize = Math.multiplyExact(dimension, elementSize)
+ if (bytes.length != expectedSize) {
+ throw new IncompatibleSchemaException(
+ s"VECTOR byte size mismatch: expected=$expectedSize,
actual=${bytes.length}")
+ }
+ elementType match {
+ case FloatType =>
+ val buffer =
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ val floats = new Array[Float](dimension)
+ var i = 0; while (i < dimension) { floats(i) =
buffer.getFloat(); i += 1 }
+ updater.set(ordinal, ArrayData.toArrayData(floats))
+ case DoubleType =>
+ val buffer =
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ val doubles = new Array[Double](dimension)
+ var i = 0; while (i < dimension) { doubles(i) =
buffer.getDouble(); i += 1 }
+ updater.set(ordinal, ArrayData.toArrayData(doubles))
+ case ByteType =>
+ updater.set(ordinal, ArrayData.toArrayData(bytes.clone()))
+ }
+ }
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+
// Before we upgrade Avro to 1.8 for logical type support, spark-avro
converts Long to Date.
// For backward compatibility, we still keep this conversion.
case (LONG, DateType) => (updater, ordinal, value) =>
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 874b90f88ae8..a1241b72e58b 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.avro
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{LocalTimestampMicros,
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -36,6 +39,7 @@ import
org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._
import java.nio.ByteBuffer
+import java.nio.ByteOrder
import java.util.TimeZone
import scala.collection.JavaConverters._
@@ -145,6 +149,40 @@ private[sql] class AvroSerializer(rootCatalystType:
DataType,
decimalConversions.toBytes(decimal.toJavaBigDecimal, avroType,
LogicalTypes.decimal(d.precision, d.scale))
+ // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+ case (ArrayType(elementType, false), FIXED) => avroType.getLogicalType
match {
+ case vectorLogicalType: VectorLogicalType =>
+ val dimension = vectorLogicalType.getDimension
+ val vecElementType =
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+ val bufferSize = Math.multiplyExact(dimension,
vecElementType.getElementSize)
+ (getter, ordinal) => {
+ val arrayData = getter.getArray(ordinal)
+ if (arrayData.numElements() != dimension) {
+ throw new IncompatibleSchemaException(
+ s"VECTOR dimension mismatch at ${toFieldStr(catalystPath)}: " +
+ s"expected=$dimension, actual=${arrayData.numElements()}")
+ }
+ elementType match {
+ case FloatType =>
+ val buffer =
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ var i = 0; while (i < dimension) {
buffer.putFloat(arrayData.getFloat(i)); i += 1 }
+ new Fixed(avroType, buffer.array())
+ case DoubleType =>
+ val buffer =
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ var i = 0; while (i < dimension) {
buffer.putDouble(arrayData.getDouble(i)); i += 1 }
+ new Fixed(avroType, buffer.array())
+ case ByteType =>
+ val bytes = new Array[Byte](dimension)
+ var i = 0; while (i < dimension) { bytes(i) =
arrayData.getByte(i); i += 1 }
+ new Fixed(avroType, bytes)
+ case _ => throw new IncompatibleSchemaException(errorPrefix +
+ s"schema is incompatible (sqlType = ${catalystType.sql},
avroType = $avroType)")
+ }
+ }
+ case _ => throw new IncompatibleSchemaException(errorPrefix +
+ s"schema is incompatible (sqlType = ${catalystType.sql}, avroType =
$avroType)")
+ }
+
case (StringType, ENUM) =>
val enumSymbols: Set[String] = avroType.getEnumSymbols.asScala.toSet
(getter, ordinal) =>
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 8623d24c6f58..d10ff1edfe87 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.avro
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{LocalTimestampMicros,
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -36,6 +39,7 @@ import org.apache.spark.unsafe.types.UTF8String
import java.math.BigDecimal
import java.nio.ByteBuffer
+import java.nio.ByteOrder
import java.util.TimeZone
import scala.collection.JavaConverters._
@@ -164,6 +168,37 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
s"Avro logical type $other cannot be converted to SQL type
${TimestampNTZType.sql}.")
}
+ // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+ case (FIXED, ArrayType(elementType, false)) => avroType.getLogicalType
match {
+ case vectorLogicalType: VectorLogicalType =>
+ val dimension = vectorLogicalType.getDimension
+ val vecElementType =
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+ val elementSize = vecElementType.getElementSize
+ (updater, ordinal, value) => {
+ val bytes = value.asInstanceOf[GenericData.Fixed].bytes()
+ val expectedSize = Math.multiplyExact(dimension, elementSize)
+ if (bytes.length != expectedSize) {
+ throw new IncompatibleSchemaException(
+ s"VECTOR byte size mismatch: expected=$expectedSize,
actual=${bytes.length}")
+ }
+ elementType match {
+ case FloatType =>
+ val buffer =
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ val floats = new Array[Float](dimension)
+ var i = 0; while (i < dimension) { floats(i) =
buffer.getFloat(); i += 1 }
+ updater.set(ordinal, ArrayData.toArrayData(floats))
+ case DoubleType =>
+ val buffer =
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ val doubles = new Array[Double](dimension)
+ var i = 0; while (i < dimension) { doubles(i) =
buffer.getDouble(); i += 1 }
+ updater.set(ordinal, ArrayData.toArrayData(doubles))
+ case ByteType =>
+ updater.set(ordinal, ArrayData.toArrayData(bytes.clone()))
+ }
+ }
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+
// Before we upgrade Avro to 1.8 for logical type support, spark-avro
converts Long to Date.
// For backward compatibility, we still keep this conversion.
case (LONG, DateType) => (updater, ordinal, value) =>
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 968cc8ff02c6..756ef82a55d2 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.avro
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{LocalTimestampMicros,
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -35,6 +38,7 @@ import org.apache.spark.sql.internal.{LegacyBehaviorPolicy,
SQLConf}
import org.apache.spark.sql.types._
import java.nio.ByteBuffer
+import java.nio.ByteOrder
import java.util.TimeZone
import scala.collection.JavaConverters._
@@ -144,6 +148,40 @@ private[sql] class AvroSerializer(rootCatalystType:
DataType,
decimalConversions.toBytes(decimal.toJavaBigDecimal, avroType,
LogicalTypes.decimal(d.precision, d.scale))
+ // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+ case (ArrayType(elementType, false), FIXED) => avroType.getLogicalType
match {
+ case vectorLogicalType: VectorLogicalType =>
+ val dimension = vectorLogicalType.getDimension
+ val vecElementType =
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+ val bufferSize = Math.multiplyExact(dimension,
vecElementType.getElementSize)
+ (getter, ordinal) => {
+ val arrayData = getter.getArray(ordinal)
+ if (arrayData.numElements() != dimension) {
+ throw new IncompatibleSchemaException(
+ s"VECTOR dimension mismatch at ${toFieldStr(catalystPath)}: " +
+ s"expected=$dimension, actual=${arrayData.numElements()}")
+ }
+ elementType match {
+ case FloatType =>
+ val buffer =
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ var i = 0; while (i < dimension) {
buffer.putFloat(arrayData.getFloat(i)); i += 1 }
+ new Fixed(avroType, buffer.array())
+ case DoubleType =>
+ val buffer =
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ var i = 0; while (i < dimension) {
buffer.putDouble(arrayData.getDouble(i)); i += 1 }
+ new Fixed(avroType, buffer.array())
+ case ByteType =>
+ val bytes = new Array[Byte](dimension)
+ var i = 0; while (i < dimension) { bytes(i) =
arrayData.getByte(i); i += 1 }
+ new Fixed(avroType, bytes)
+ case _ => throw new IncompatibleSchemaException(errorPrefix +
+ s"schema is incompatible (sqlType = ${catalystType.sql},
avroType = $avroType)")
+ }
+ }
+ case _ => throw new IncompatibleSchemaException(errorPrefix +
+ s"schema is incompatible (sqlType = ${catalystType.sql}, avroType =
$avroType)")
+ }
+
case (StringType, ENUM) =>
val enumSymbols: Set[String] = avroType.getEnumSymbols.asScala.toSet
(getter, ordinal) =>
diff --git
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 8623d24c6f58..926e5b9f7dbd 100644
---
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.avro
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{LocalTimestampMicros,
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -36,6 +39,7 @@ import org.apache.spark.unsafe.types.UTF8String
import java.math.BigDecimal
import java.nio.ByteBuffer
+import java.nio.ByteOrder
import java.util.TimeZone
import scala.collection.JavaConverters._
@@ -212,6 +216,37 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
updater.setDecimal(ordinal, decimal)
+ // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+ case (FIXED, ArrayType(elementType, false)) => avroType.getLogicalType
match {
+ case vectorLogicalType: VectorLogicalType =>
+ val dimension = vectorLogicalType.getDimension
+ val vecElementType =
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+ val elementSize = vecElementType.getElementSize
+ (updater, ordinal, value) => {
+ val bytes = value.asInstanceOf[GenericData.Fixed].bytes()
+ val expectedSize = Math.multiplyExact(dimension, elementSize)
+ if (bytes.length != expectedSize) {
+ throw new IncompatibleSchemaException(
+ s"VECTOR byte size mismatch: expected=$expectedSize,
actual=${bytes.length}")
+ }
+ elementType match {
+ case FloatType =>
+ val buffer =
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ val floats = new Array[Float](dimension)
+ var i = 0; while (i < dimension) { floats(i) =
buffer.getFloat(); i += 1 }
+ updater.set(ordinal, ArrayData.toArrayData(floats))
+ case DoubleType =>
+ val buffer =
ByteBuffer.wrap(bytes).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ val doubles = new Array[Double](dimension)
+ var i = 0; while (i < dimension) { doubles(i) =
buffer.getDouble(); i += 1 }
+ updater.set(ordinal, ArrayData.toArrayData(doubles))
+ case ByteType =>
+ updater.set(ordinal, ArrayData.toArrayData(bytes.clone()))
+ }
+ }
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+
case (BYTES, _: DecimalType) => (updater, ordinal, value) =>
val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
val bigDecimal =
decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
diff --git
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 968cc8ff02c6..756ef82a55d2 100644
---
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.avro
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType
+
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{LocalTimestampMicros,
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -35,6 +38,7 @@ import org.apache.spark.sql.internal.{LegacyBehaviorPolicy,
SQLConf}
import org.apache.spark.sql.types._
import java.nio.ByteBuffer
+import java.nio.ByteOrder
import java.util.TimeZone
import scala.collection.JavaConverters._
@@ -144,6 +148,40 @@ private[sql] class AvroSerializer(rootCatalystType:
DataType,
decimalConversions.toBytes(decimal.toJavaBigDecimal, avroType,
LogicalTypes.decimal(d.precision, d.scale))
+ // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+ case (ArrayType(elementType, false), FIXED) => avroType.getLogicalType
match {
+ case vectorLogicalType: VectorLogicalType =>
+ val dimension = vectorLogicalType.getDimension
+ val vecElementType =
HoodieSchema.Vector.VectorElementType.fromString(vectorLogicalType.getElementType)
+ val bufferSize = Math.multiplyExact(dimension,
vecElementType.getElementSize)
+ (getter, ordinal) => {
+ val arrayData = getter.getArray(ordinal)
+ if (arrayData.numElements() != dimension) {
+ throw new IncompatibleSchemaException(
+ s"VECTOR dimension mismatch at ${toFieldStr(catalystPath)}: " +
+ s"expected=$dimension, actual=${arrayData.numElements()}")
+ }
+ elementType match {
+ case FloatType =>
+ val buffer =
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ var i = 0; while (i < dimension) {
buffer.putFloat(arrayData.getFloat(i)); i += 1 }
+ new Fixed(avroType, buffer.array())
+ case DoubleType =>
+ val buffer =
ByteBuffer.allocate(bufferSize).order(VectorLogicalType.VECTOR_BYTE_ORDER)
+ var i = 0; while (i < dimension) {
buffer.putDouble(arrayData.getDouble(i)); i += 1 }
+ new Fixed(avroType, buffer.array())
+ case ByteType =>
+ val bytes = new Array[Byte](dimension)
+ var i = 0; while (i < dimension) { bytes(i) =
arrayData.getByte(i); i += 1 }
+ new Fixed(avroType, bytes)
+ case _ => throw new IncompatibleSchemaException(errorPrefix +
+ s"schema is incompatible (sqlType = ${catalystType.sql},
avroType = $avroType)")
+ }
+ }
+ case _ => throw new IncompatibleSchemaException(errorPrefix +
+ s"schema is incompatible (sqlType = ${catalystType.sql}, avroType =
$avroType)")
+ }
+
case (StringType, ENUM) =>
val enumSymbols: Set[String] = avroType.getEnumSymbols.asScala.toSet
(getter, ordinal) =>