the-other-tim-brown commented on code in PR #18190:
URL: https://github.com/apache/hudi/pull/18190#discussion_r2850316235
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -84,18 +84,98 @@
*/
public class HoodieSchema implements Serializable {
private static final long serialVersionUID = 1L;
+
/**
* Constant representing a null JSON value, equivalent to
JsonProperties.NULL_VALUE.
* This provides compatibility with Avro's JsonProperties while maintaining
Hudi's API.
*/
public static final Object NULL_VALUE = JsonProperties.NULL_VALUE;
public static final HoodieSchema NULL_SCHEMA =
HoodieSchema.create(HoodieSchemaType.NULL);
-
/**
* Constant to use when attaching type metadata to external schema systems
like Spark's StructType.
+ * Stores a parameterized type string for custom Hudi logical types such as
VECTOR and BLOB.
+ * Examples: "VECTOR(128)", "VECTOR(512, DOUBLE)", "BLOB".
*/
public static final String TYPE_METADATA_FIELD = "hudi_type";
+ /**
+ * Converts a HoodieSchema to its parameterized type string for custom Hudi
logical types
+ * such as VECTOR and BLOB. Only supports custom logical types — throws for
standard types.
+ * Parameterized types include positional parameters: "VECTOR(128)",
"VECTOR(128, DOUBLE)".
+ * Default parameters are omitted: VECTOR(dim) implies elementType=FLOAT.
+ */
+ public static String toTypeString(HoodieSchema schema) {
Review Comment:
Can we make these instance methods instead?
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -84,18 +84,98 @@
*/
public class HoodieSchema implements Serializable {
private static final long serialVersionUID = 1L;
+
/**
* Constant representing a null JSON value, equivalent to
JsonProperties.NULL_VALUE.
* This provides compatibility with Avro's JsonProperties while maintaining
Hudi's API.
*/
public static final Object NULL_VALUE = JsonProperties.NULL_VALUE;
public static final HoodieSchema NULL_SCHEMA =
HoodieSchema.create(HoodieSchemaType.NULL);
-
/**
* Constant to use when attaching type metadata to external schema systems
like Spark's StructType.
+ * Stores a parameterized type string for custom Hudi logical types such as
VECTOR and BLOB.
+ * Examples: "VECTOR(128)", "VECTOR(512, DOUBLE)", "BLOB".
*/
public static final String TYPE_METADATA_FIELD = "hudi_type";
+ /**
+ * Converts a HoodieSchema to its parameterized type string for custom Hudi
logical types
+ * such as VECTOR and BLOB. Only supports custom logical types — throws for
standard types.
+ * Parameterized types include positional parameters: "VECTOR(128)",
"VECTOR(128, DOUBLE)".
+ * Default parameters are omitted: VECTOR(dim) implies elementType=FLOAT.
+ */
+ public static String toTypeString(HoodieSchema schema) {
+ HoodieSchemaType type = schema.getType();
+ switch (type) {
+ case VECTOR:
+ Vector v = (Vector) schema;
+ if (v.getVectorElementType() == Vector.VectorElementType.FLOAT) {
+ return "VECTOR(" + v.getDimension() + ")";
+ }
+ return "VECTOR(" + v.getDimension() + ", " +
v.getVectorElementType().getDataType() + ")";
+ default:
+ throw new IllegalArgumentException(
+ "toTypeString only supports custom logical types, got: " + type);
+ }
+ }
+
+ /**
+ * Parses a parameterized type string for custom Hudi logical types such as
VECTOR and BLOB.
+ * Examples: "VECTOR(128)" or "VECTOR(512, DOUBLE)".
+ * Throws for non-custom logical type names.
+ */
+ public static TypeDescriptor parseTypeString(String descriptor) {
+ int parenStart = descriptor.indexOf('(');
+ String typeName;
+ List<String> params;
+ if (parenStart == -1) {
+ typeName = descriptor.trim();
+ params = Collections.emptyList();
+ } else {
+ typeName = descriptor.substring(0, parenStart).trim();
+ String paramStr = descriptor.substring(parenStart + 1,
descriptor.length() - 1).trim();
+ params = Arrays.stream(paramStr.split(","))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ }
+ HoodieSchemaType type = HoodieSchemaType.valueOf(typeName);
+ if (!CUSTOM_LOGICAL_TYPES.contains(type)) {
+ throw new IllegalArgumentException(
+ "parseTypeString only supports custom logical types, got: " + type);
+ }
+ return new TypeDescriptor(type, params);
+ }
+
+ private static final java.util.Set<HoodieSchemaType> CUSTOM_LOGICAL_TYPES =
+ java.util.EnumSet.of(HoodieSchemaType.VECTOR);
Review Comment:
nit: import these
##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -144,6 +144,34 @@ private[sql] class AvroSerializer(rootCatalystType:
DataType,
decimalConversions.toBytes(decimal.toJavaBigDecimal, avroType,
LogicalTypes.decimal(d.precision, d.scale))
+ // Handle VECTOR logical type
+ case (ArrayType(FloatType, false), FIXED)
+ if avroType.getLogicalType != null &&
+ avroType.getLogicalType.getName == "vector" =>
+
+ val dimension =
avroType.getObjectProp("dimension").asInstanceOf[Number].intValue()
+
+ (getter, ordinal) => {
+ val arrayData = getter.getArray(ordinal)
+
+ // Validate dimension
+ if (arrayData.numElements() != dimension) {
+ throw new IncompatibleSchemaException(
+ s"VECTOR dimension mismatch at ${toFieldStr(catalystPath)}: " +
+ s"expected=$dimension, actual=${arrayData.numElements()}")
+ }
+
+ // Pack floats into bytes (little-endian, 4 bytes per float)
+ val buffer = java.nio.ByteBuffer.allocate(dimension * 4)
Review Comment:
Similarly, import these classes
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -84,18 +84,98 @@
*/
public class HoodieSchema implements Serializable {
private static final long serialVersionUID = 1L;
+
/**
* Constant representing a null JSON value, equivalent to
JsonProperties.NULL_VALUE.
* This provides compatibility with Avro's JsonProperties while maintaining
Hudi's API.
*/
public static final Object NULL_VALUE = JsonProperties.NULL_VALUE;
public static final HoodieSchema NULL_SCHEMA =
HoodieSchema.create(HoodieSchemaType.NULL);
-
/**
* Constant to use when attaching type metadata to external schema systems
like Spark's StructType.
+ * Stores a parameterized type string for custom Hudi logical types such as
VECTOR and BLOB.
+ * Examples: "VECTOR(128)", "VECTOR(512, DOUBLE)", "BLOB".
*/
public static final String TYPE_METADATA_FIELD = "hudi_type";
+ /**
+ * Converts a HoodieSchema to its parameterized type string for custom Hudi
logical types
+ * such as VECTOR and BLOB. Only supports custom logical types — throws for
standard types.
+ * Parameterized types include positional parameters: "VECTOR(128)",
"VECTOR(128, DOUBLE)".
+ * Default parameters are omitted: VECTOR(dim) implies elementType=FLOAT.
+ */
+ public static String toTypeString(HoodieSchema schema) {
+ HoodieSchemaType type = schema.getType();
+ switch (type) {
+ case VECTOR:
+ Vector v = (Vector) schema;
+ if (v.getVectorElementType() == Vector.VectorElementType.FLOAT) {
+ return "VECTOR(" + v.getDimension() + ")";
+ }
+ return "VECTOR(" + v.getDimension() + ", " +
v.getVectorElementType().getDataType() + ")";
+ default:
+ throw new IllegalArgumentException(
+ "toTypeString only supports custom logical types, got: " + type);
+ }
+ }
+
+ /**
+ * Parses a parameterized type string for custom Hudi logical types such as
VECTOR and BLOB.
+ * Examples: "VECTOR(128)" or "VECTOR(512, DOUBLE)".
+ * Throws for non-custom logical type names.
+ */
+ public static TypeDescriptor parseTypeString(String descriptor) {
Review Comment:
Can this directly return a `HoodieSchema`?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -44,7 +44,7 @@ object HoodieSparkSchemaConverters {
/**
* Internal wrapper for SQL data type and nullability.
*/
- case class SchemaType(dataType: DataType, nullable: Boolean)
+ case class SchemaType(dataType: DataType, nullable: Boolean, metadata:
Option[Metadata] = None)
Review Comment:
I think this approach is much cleaner than what I did for Blobs. Can you
update the blob handling to also use this so it is consistent?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -196,10 +233,20 @@ object HoodieSparkSchemaConverters {
val newRecordNames = existingRecordNames + fullName
val fields = hoodieSchema.getFields.asScala.map { f =>
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
- val commentMetadata = if (f.doc().isPresent &&
!f.doc().get().isEmpty) {
- new MetadataBuilder().putString("comment", f.doc().get()).build()
- } else {
- Metadata.empty
+ val commentMetadata = schemaType.metadata match {
Review Comment:
Should we combine the handling at line 252?
##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala:
##########
@@ -164,6 +164,31 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
s"Avro logical type $other cannot be converted to SQL type
${TimestampNTZType.sql}.")
}
+ // Handle VECTOR logical type
+ case (FIXED, ArrayType(FloatType, false))
+ if avroType.getLogicalType != null &&
+ avroType.getLogicalType.getName == "vector" =>
+
+ val dimension =
avroType.getObjectProp("dimension").asInstanceOf[Number].intValue()
+
+ (updater, ordinal, value) => {
+ val bytes = value.asInstanceOf[GenericData.Fixed].bytes()
+
+ // Validate size
+ val expectedSize = dimension * 4
+ if (bytes.length != expectedSize) {
+ throw new IncompatibleSchemaException(
+ s"VECTOR byte size mismatch: expected=$expectedSize,
actual=${bytes.length}")
+ }
+
+ // Unpack bytes to float array (little-endian)
+ val buffer = java.nio.ByteBuffer.wrap(bytes)
+ .order(java.nio.ByteOrder.LITTLE_ENDIAN)
+ val floats = (0 until dimension).map(_ => buffer.getFloat()).toArray
Review Comment:
I'm curious if there is any optimization we should do here like set the size
of the array up front or if that is not required in scala
##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala:
##########
@@ -164,6 +164,31 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
s"Avro logical type $other cannot be converted to SQL type
${TimestampNTZType.sql}.")
}
+ // Handle VECTOR logical type
+ case (FIXED, ArrayType(FloatType, false))
+ if avroType.getLogicalType != null &&
+ avroType.getLogicalType.getName == "vector" =>
+
+ val dimension =
avroType.getObjectProp("dimension").asInstanceOf[Number].intValue()
+
+ (updater, ordinal, value) => {
+ val bytes = value.asInstanceOf[GenericData.Fixed].bytes()
+
+ // Validate size
+ val expectedSize = dimension * 4
+ if (bytes.length != expectedSize) {
+ throw new IncompatibleSchemaException(
+ s"VECTOR byte size mismatch: expected=$expectedSize,
actual=${bytes.length}")
+ }
+
+ // Unpack bytes to float array (little-endian)
+ val buffer = java.nio.ByteBuffer.wrap(bytes)
+ .order(java.nio.ByteOrder.LITTLE_ENDIAN)
Review Comment:
nit: import these
##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -144,6 +144,34 @@ private[sql] class AvroSerializer(rootCatalystType:
DataType,
decimalConversions.toBytes(decimal.toJavaBigDecimal, avroType,
LogicalTypes.decimal(d.precision, d.scale))
+ // Handle VECTOR logical type
+ case (ArrayType(FloatType, false), FIXED)
+ if avroType.getLogicalType != null &&
+ avroType.getLogicalType.getName == "vector" =>
+
+ val dimension =
avroType.getObjectProp("dimension").asInstanceOf[Number].intValue()
Review Comment:
nitpick: can you cast the logical type and then use the getter method on
that class?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]