This is an automated email from the ASF dual-hosted git repository.
vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 75918eabcb33 feat(blob): Create blobs in Spark SQL (#18347)
75918eabcb33 is described below
commit 75918eabcb33061d81f23a237192d7ab4bdb8c07
Author: Tim Brown <[email protected]>
AuthorDate: Mon Mar 30 23:34:29 2026 -0400
feat(blob): Create blobs in Spark SQL (#18347)
* add in code and test for handling blob type in SQL table creation
* remove spacing changes
* add basic testing, cleanup constants in HoodieSchema
* make blob type return a constant, add test for blob keyword without blob
type
* style
* fix style
---
.../apache/hudi/common/schema/HoodieSchema.java | 7 +-
.../avro/AvroSchemaConverterWithTimestampNTZ.java | 1 +
.../org/apache/spark/sql/types/BlobType.scala | 41 ++++++
.../org/apache/hudi/blob/BlobTestHelpers.scala | 129 +++++++++++++++++++
.../org/apache/hudi/blob/TestBlobSupport.scala | 142 +++++++++++++++++++++
.../spark/sql/hudi/ddl/TestCreateTable.scala | 124 ++++++++++++++++++
.../HoodieSpark3_3ExtendedSqlAstBuilder.scala | 29 ++++-
.../parser/HoodieSpark3_3ExtendedSqlParser.scala | 3 +-
.../HoodieSpark3_4ExtendedSqlAstBuilder.scala | 29 ++++-
.../parser/HoodieSpark3_4ExtendedSqlParser.scala | 3 +-
.../HoodieSpark3_5ExtendedSqlAstBuilder.scala | 29 ++++-
.../parser/HoodieSpark3_5ExtendedSqlParser.scala | 3 +-
.../HoodieSpark4_0ExtendedSqlAstBuilder.scala | 29 ++++-
.../parser/HoodieSpark4_0ExtendedSqlParser.scala | 3 +-
14 files changed, 550 insertions(+), 22 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index 01a0963a6623..e20aeea24911 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -2548,7 +2548,10 @@ public class HoodieSchema implements Serializable {
public static final String TYPE_DESCRIPTOR = "BLOB";
private static final String DEFAULT_NAME = "blob";
private static final List<Schema.Field> BLOB_FIELDS = createBlobFields();
+ private static final int REFERENCE_FIELD_COUNT =
AvroSchemaUtils.getNonNullTypeFromUnion(BLOB_FIELDS.get(2).schema()).getFields().size();
+ public static final String INLINE = "INLINE";
+ public static final String OUT_OF_LINE = "OUT_OF_LINE";
public static final String TYPE = "type";
public static final String INLINE_DATA_FIELD = "data";
public static final String EXTERNAL_REFERENCE = "reference";
@@ -2564,7 +2567,7 @@ public class HoodieSchema implements Serializable {
}
public static int getReferenceFieldCount() {
- return
AvroSchemaUtils.getNonNullTypeFromUnion(BLOB_FIELDS.get(2).schema()).getFields().size();
+ return REFERENCE_FIELD_COUNT;
}
/**
@@ -2622,7 +2625,7 @@ public class HoodieSchema implements Serializable {
referenceField.setFields(referenceFields);
return Arrays.asList(
- new Schema.Field(TYPE, Schema.createEnum("blob_storage_type", null,
null, Arrays.asList("INLINE", "OUT_OF_LINE")), null, null),
+ new Schema.Field(TYPE, Schema.createEnum("blob_storage_type", null,
null, Arrays.asList(INLINE, OUT_OF_LINE)), null, null),
new Schema.Field(INLINE_DATA_FIELD,
AvroSchemaUtils.createNullableSchema(bytesField), null,
Schema.Field.NULL_DEFAULT_VALUE),
new Schema.Field(EXTERNAL_REFERENCE,
AvroSchemaUtils.createNullableSchema(referenceField), null,
Schema.Field.NULL_DEFAULT_VALUE)
);
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
index 6641e70cbaa3..8895186fd055 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
@@ -221,6 +221,7 @@ public class AvroSchemaConverterWithTimestampNTZ extends
HoodieAvroParquetSchema
}
break;
case RECORD:
+ case BLOB:
return new GroupType(repetition, fieldName,
convertFields(schema.getFields(), schemaPath));
case ENUM:
builder = Types.primitive(BINARY, repetition).as(enumType());
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/types/BlobType.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/types/BlobType.scala
new file mode 100644
index 000000000000..9be8e9b7a008
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/types/BlobType.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.apache.hudi.common.schema.HoodieSchema
+
+import org.apache.spark.sql.avro.HoodieSparkSchemaConverters
+
+/**
+ * Factory object for creating Spark StructType representation of {@link
HoodieSchema.Blob}.
+ *
+ * BLOB is a logical type in Hudi that represents binary large objects. The
data can be
+ * stored inline (as bytes) or out-of-line (as a reference to a file location).
+ */
+object BlobType {
+ val dataType: DataType =
HoodieSparkSchemaConverters.toSqlType(HoodieSchema.createBlob())._1
+
+ /**
+ * Creates a StructType representing a {@link HoodieSchema.Blob}.
+ *
+ * @return StructType with blob structure
+ */
+ def apply(): DataType = dataType
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/BlobTestHelpers.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/BlobTestHelpers.scala
new file mode 100644
index 000000000000..8783422c51b4
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/BlobTestHelpers.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.blob
+
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+
+import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.functions.{lit, struct}
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+
+import java.io.File
+import java.nio.file.{Files, Path}
+
+object BlobTestHelpers {
+ def blobMetadata: Metadata = {
+ new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD,
HoodieSchemaType.BLOB.name())
+ .build()
+ }
+
+ def inlineBlobStructCol(name: String, bytesCol: Column): Column = {
+ struct(
+ lit(HoodieSchema.Blob.INLINE).as(HoodieSchema.Blob.TYPE),
+ bytesCol.as(HoodieSchema.Blob.INLINE_DATA_FIELD),
+
lit(null).cast("struct<externalPath:string,offset:bigint,length:bigint,managed:boolean>")
+ .as(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+ ).as(name, blobMetadata)
+ }
+
+ def wholeFileBlobStructCol(name: String, filePathCol: Column): Column = {
+ struct(
+ lit(HoodieSchema.Blob.OUT_OF_LINE).as(HoodieSchema.Blob.TYPE),
+ lit(null).cast("binary").as(HoodieSchema.Blob.INLINE_DATA_FIELD),
+ struct(
+ filePathCol.as(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH),
+
lit(null).cast("bigint").as(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET),
+
lit(null).cast("bigint").as(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH),
+ lit(false).as(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED)
+ ).as(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+ ).as(name, blobMetadata)
+ }
+
+ def blobStructCol(name: String, filePathCol: Column, offsetCol: Column,
lengthCol: Column): Column = {
+ struct(
+ lit(HoodieSchema.Blob.OUT_OF_LINE).as(HoodieSchema.Blob.TYPE),
+ lit(null).cast("binary").as(HoodieSchema.Blob.INLINE_DATA_FIELD),
+ struct(
+ filePathCol.as(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH),
+ offsetCol.as(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET),
+ lengthCol.as(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH),
+ lit(false).as(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED)
+ ).as(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+ ).as(name, blobMetadata)
+ }
+
+ def createTestFile(tempDir: Path, name: String, size: Int): String = {
+ val file = new File(tempDir.toString, name)
+ val bytes = (0 until size).map(i => (i % 256).toByte).toArray
+ Files.write(file.toPath, bytes)
+ file.getAbsolutePath
+ }
+
+ /**
+ * Assert that byte array contains expected pattern (i % 256) at given
offset.
+ *
+ * @param data Array of bytes to verify
+ * @param expectedOffset Starting offset for pattern (default 0)
+ */
+ def assertBytesContent(data: Array[Byte], expectedOffset: Int = 0): Unit = {
+ for (i <- 0 until data.length) {
+ assertEquals((expectedOffset + i) % 256, data(i) & 0xFF,
+ s"Mismatch at byte $i (global offset ${expectedOffset + i})")
+ }
+ }
+
+ /**
+ * Execute code block with temporary Spark configuration.
+ * Automatically restores previous values after execution.
+ *
+ * @param spark SparkSession instance
+ * @param configs Configuration key-value pairs to set
+ * @param fn Code block to execute with configs
+ */
+ def withSparkConfig[T](spark: SparkSession, configs: Map[String,
String])(fn: => T): T = {
+ val oldValues = configs.keys.map(k => (k, spark.conf.getOption(k))).toMap
+ try {
+ configs.foreach { case (k, v) => spark.conf.set(k, v) }
+ fn
+ } finally {
+ oldValues.foreach { case (k, oldValue) =>
+ oldValue match {
+ case Some(v) => spark.conf.set(k, v)
+ case None => spark.conf.unset(k)
+ }
+ }
+ }
+ }
+
+ /**
+ * Assert that DataFrame contains all specified columns.
+ *
+ * @param df DataFrame to check
+ * @param columnNames Variable number of column names
+ */
+ def assertColumnsExist(df: DataFrame, columnNames: String*): Unit = {
+ columnNames.foreach { colName =>
+ assertTrue(df.columns.contains(colName),
+ s"DataFrame missing expected column: $colName. Available:
${df.columns.mkString(", ")}")
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestBlobSupport.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestBlobSupport.scala
new file mode 100644
index 000000000000..22e5eee77773
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestBlobSupport.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.blob
+
+import org.apache.hudi.SparkDatasetMixin
+import org.apache.hudi.blob.BlobTestHelpers._
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord,
HoodieFileFormat, HoodieKey, HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaField,
HoodieSchemaType}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.config.HoodieIndexConfig
+import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.testutils.HoodieClientTestBase
+
+import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import java.util.{Arrays, Properties}
+
+import scala.collection.JavaConverters._
+
+/**
+ * End-to-end test for blob support including schema creation, multi-commit
writes,
+ * DataFrame reads, and SQL blob retrieval.
+ *
+ * This test validates:
+ * <ul>
+ * <li>Creating tables with blob columns</li>
+ * <li>Writing blob records with out-of-line references</li>
+ * <li>Multi-commit operations (insert + upsert)</li>
+ * <li>Reading blob references via DataFrame</li>
+ * <li>SQL read_blob() function integration</li>
+ * </ul>
+ */
+class TestBlobSupport extends HoodieClientTestBase with SparkDatasetMixin {
+ val SCHEMA: HoodieSchema = HoodieSchema.createRecord("test_blobs", null,
null, Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.STRING),
null, null),
+ HoodieSchemaField.of("value", HoodieSchema.create(HoodieSchemaType.INT),
null, null),
+ HoodieSchemaField.of("data", HoodieSchema.createBlob(), null, null)
+ ))
+
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testEndToEnd(tableType: HoodieTableType): Unit = {
+ val filePath1 = createTestFile(tempDir, "file1.bin", 1000)
+ val filePath2 = createTestFile(tempDir, "file2.bin", 1000)
+
+ val properties = new Properties()
+ properties.put("hoodie.datasource.write.recordkey.field", "id")
+ properties.put("hoodie.datasource.write.partitionpath.field", "")
+ properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "id")
+ properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "")
+ properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(),
HoodieFileFormat.PARQUET.toString)
+
+ // Initialize table
+ HoodieTableMetaClient.newTableBuilder()
+ .setTableName("test_blob_table")
+ .setTableType(tableType)
+ .fromProperties(properties)
+ .initTable(storageConf, basePath)
+
+ var client: SparkRDDWriteClient[IndexedRecord] = null
+ val config = getConfigBuilder(SCHEMA.toString())
+
.withIndexConfig(HoodieIndexConfig.newBuilder.withIndexType(HoodieIndex.IndexType.SIMPLE).build)
+ .build()
+ try {
+ client =
getHoodieWriteClient(config).asInstanceOf[SparkRDDWriteClient[IndexedRecord]]
+
+ // First commit - insert
+ val commit1 = client.startCommit()
+ val firstBatch = createTestRecords(filePath1)
+ val statuses1 = client.insert(jsc.parallelize(firstBatch.asJava, 1),
commit1).collect()
+ client.commit(commit1, jsc.parallelize(statuses1, 1))
+
+ // Second commit - upsert
+ val commit2 = client.startCommit()
+ val secondBatch = createTestRecords(filePath2)
+ val statuses2 = client.upsert(jsc.parallelize(secondBatch.asJava, 1),
commit2).collect()
+ client.commit(commit2, jsc.parallelize(statuses2, 1))
+ } finally {
+ if (client != null) client.close()
+ }
+
+ // Read and verify DataFrame results
+ val table = sparkSession.read.format("hudi").load(basePath)
+ val rows = table.collectAsList()
+ assertEquals(10, rows.size())
+
+ rows.asScala.foreach { row =>
+ val data = row.getStruct(row.fieldIndex("data"))
+ val reference =
data.getStruct(data.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE))
+ val filePath =
reference.getString(reference.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH))
+ assertTrue(filePath.endsWith("file2.bin"))
+ }
+ }
+
+ private def createTestRecords(filePath: String):
Seq[HoodieRecord[IndexedRecord]] = {
+ (0 until 10).map { i =>
+ val id = s"id_$i"
+ val key = new HoodieKey(id, "")
+
+ val dataSchema = SCHEMA.getField("data").get.schema
+ val fileReference = new
GenericData.Record(dataSchema.getField(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+ .get.getNonNullSchema.toAvroSchema)
+ fileReference.put(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH, filePath)
+ fileReference.put(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET, i * 100L)
+ fileReference.put(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH, 100L)
+ fileReference.put(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED, false)
+
+ val blobRecord = new GenericData.Record(dataSchema.toAvroSchema)
+ blobRecord.put(HoodieSchema.Blob.TYPE, new
GenericData.EnumSymbol(dataSchema.getField(HoodieSchema.Blob.TYPE)
+ .get.schema.toAvroSchema, HoodieSchema.Blob.OUT_OF_LINE))
+ blobRecord.put(HoodieSchema.Blob.EXTERNAL_REFERENCE, fileReference)
+
+ val record = new GenericData.Record(SCHEMA.toAvroSchema)
+ record.put("id", id)
+ record.put("value", i)
+ record.put("data", blobRecord)
+
+ new HoodieAvroIndexedRecord(key, record)
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
index cf29c75edaca..5cd5dbb0d54b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.ddl
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType,
WriteOperationType}
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName
import org.apache.hudi.config.HoodieWriteConfig
@@ -1996,4 +1997,127 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
HoodieSparkSqlTestBase.enableComplexKeygenValidation(spark, tableName)
checkAnswer(query)(expectedRowsAfter: _*)
}
+
+ test("test create table with BLOB column") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | id BIGINT,
+ | video BLOB COMMENT 'Product demonstration video'
+ |) USING hudi
+ |LOCATION '${tmp.getCanonicalPath}'
+ |TBLPROPERTIES (
+ | primaryKey = 'id'
+ |)
+ """.stripMargin)
+
+ // Verify schema has hudi_blob metadata
+ val schema = spark.table(tableName).schema
+ val videoField = schema.find(_.name == "video").get
+
assertTrue(videoField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ assertEquals(HoodieSchemaType.BLOB.name(),
videoField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+ assertEquals("Product demonstration video",
videoField.metadata.getString("comment"))
+
+ // Verify structure matches blob schema
+ assertTrue(videoField.dataType.isInstanceOf[StructType])
+ assertEquals(BlobType(), videoField.dataType)
+ }
+ }
+
+ test("test create table with multiple BLOB columns") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | id BIGINT,
+ | video BLOB,
+ | thumbnail blob,
+ | metadata MAP<STRING, STRING>,
+ | audio BLOB NOT NULL
+ |) USING hudi
+ |LOCATION '${tmp.getCanonicalPath}'
+ |TBLPROPERTIES (
+ | primaryKey = 'id'
+ |)
+ """.stripMargin)
+
+ val schema = spark.table(tableName).schema
+
+ // Verify all BLOB columns have the metadata
+ val blobColumns = Seq("video", "thumbnail", "audio")
+ blobColumns.foreach { colName =>
+ val field = schema.find(_.name == colName).get
+ assertTrue(field.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ assertEquals(HoodieSchemaType.BLOB.name(),
field.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+ assertTrue(field.dataType.isInstanceOf[StructType])
+
+ if (colName == "audio") {
+ assertFalse(field.nullable)
+ } else {
+ assertTrue(field.nullable)
+ }
+
+ val blobStruct = field.dataType.asInstanceOf[StructType]
+ assertEquals(BlobType(), blobStruct)
+ }
+ }
+ }
+
+ test("test BLOB in nested struct") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | id BIGINT,
+ | media STRUCT<title: STRING, content: BLOB>
+ |) USING hudi
+ |LOCATION '${tmp.getCanonicalPath}'
+ |TBLPROPERTIES (
+ | primaryKey = 'id'
+ |)
+ """.stripMargin)
+
+ val schema = spark.table(tableName).schema
+ val mediaField = schema.find(_.name == "media").get
+ assertTrue(mediaField.dataType.isInstanceOf[StructType])
+
+ val mediaStruct = mediaField.dataType.asInstanceOf[StructType]
+ val contentField = mediaStruct.find(_.name == "content").get
+
+ // Verify nested BLOB has metadata
+
assertTrue(contentField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ assertEquals(HoodieSchemaType.BLOB.name(),
contentField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+
+ // Verify structure
+ assertTrue(contentField.dataType.isInstanceOf[StructType])
+ val blobStruct = contentField.dataType.asInstanceOf[StructType]
+ assertEquals(BlobType(), blobStruct)
+ }
+ }
+
+ test("test create field with blob as name but not type") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | id BIGINT,
+ | blob_path STRING
+ |) USING hudi
+ |LOCATION '${tmp.getCanonicalPath}'
+ |TBLPROPERTIES (
+ | primaryKey = 'id'
+ |)
+ """.stripMargin)
+
+ // ensure that blob in the name does not cause any regressions in parsing
+ val schema = spark.table(tableName).schema
+ val blobPathField = schema.find(_.name == "blob_path").get
+ assertTrue(blobPathField.dataType.isInstanceOf[StringType])
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala
index ce51cfab5b1b..ec357ccb7b1a 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.parser
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
import org.apache.hudi.spark.sql.parser.{HoodieSqlBaseBaseVisitor,
HoodieSqlBaseParser}
import org.apache.hudi.spark.sql.parser.HoodieSqlBaseParser._
@@ -40,6 +41,7 @@ import
org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.connector.expressions.{ApplyTransform,
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference,
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform,
YearsTransform}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.BlobType
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.Utils.isTesting
import org.apache.spark.util.random.RandomSampler
@@ -2603,6 +2605,7 @@ class HoodieSpark3_3ExtendedSqlAstBuilder(conf: SQLConf,
delegate: ParserInterfa
case ("character" | "char", length :: Nil) =>
CharType(length.getText.toInt)
case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
case ("binary", Nil) => BinaryType
+ case ("blob", Nil) => BlobType()
case ("decimal" | "dec" | "numeric", Nil) => DecimalType.USER_DEFAULT
case ("decimal" | "dec" | "numeric", precision :: Nil) =>
DecimalType(precision.getText.toInt, 0)
@@ -2686,13 +2689,24 @@ class HoodieSpark3_3ExtendedSqlAstBuilder(conf:
SQLConf, delegate: ParserInterfa
builder.putString("comment", _)
}
+ val dataType = typedVisit[DataType](ctx.dataType)
+
+ addMetadataForType(ctx.dataType(), builder)
+
StructField(
name = colName.getText,
- dataType = typedVisit[DataType](ctx.dataType),
+ dataType = dataType,
nullable = NULL == null,
metadata = builder.build())
}
+ private def addMetadataForType(dataType:
HoodieSqlBaseParser.DataTypeContext, builder: MetadataBuilder): Unit = {
+ val typeText = dataType.getText
+ if (typeText.equalsIgnoreCase(HoodieSchemaType.BLOB.name())) {
+ builder.putString(HoodieSchema.TYPE_METADATA_FIELD,
HoodieSchemaType.BLOB.name())
+ }
+ }
+
/**
* Create a [[StructType]] from a sequence of [[StructField]]s.
*/
@@ -2713,11 +2727,18 @@ class HoodieSpark3_3ExtendedSqlAstBuilder(conf:
SQLConf, delegate: ParserInterfa
*/
override def visitComplexColType(ctx: ComplexColTypeContext): StructField =
withOrigin(ctx) {
import ctx._
- val structField = StructField(
+ val builder = new MetadataBuilder
+ // Add comment to metadata
+ Option(commentSpec()).map(visitCommentSpec).foreach {
+ builder.putString("comment", _)
+ }
+ addMetadataForType(ctx.dataType(), builder)
+
+ StructField(
name = identifier.getText,
dataType = typedVisit(dataType()),
- nullable = NULL == null)
-
Option(commentSpec).map(visitCommentSpec).map(structField.withComment).getOrElse(structField)
+ nullable = NULL == null,
+ metadata = builder.build())
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala
index 4b98a0fd11dc..e10df36d2faf 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala
@@ -128,7 +128,8 @@ class HoodieSpark3_3ExtendedSqlParser(session:
SparkSession, delegate: ParserInt
normalized.contains("create index") ||
normalized.contains("drop index") ||
normalized.contains("show indexes") ||
- normalized.contains("refresh index")
+ normalized.contains("refresh index") ||
+ normalized.contains(" blob")
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala
index ba62f1c719cc..fb995babe70d 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.parser
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
import org.apache.hudi.spark.sql.parser.{HoodieSqlBaseBaseVisitor,
HoodieSqlBaseParser}
import org.apache.hudi.spark.sql.parser.HoodieSqlBaseParser._
@@ -40,6 +41,7 @@ import
org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.connector.expressions.{ApplyTransform,
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference,
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform,
YearsTransform}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.BlobType
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.Utils.isTesting
import org.apache.spark.util.random.RandomSampler
@@ -2606,6 +2608,7 @@ class HoodieSpark3_4ExtendedSqlAstBuilder(conf: SQLConf,
delegate: ParserInterfa
case ("character" | "char", length :: Nil) =>
CharType(length.getText.toInt)
case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
case ("binary", Nil) => BinaryType
+ case ("blob", Nil) => BlobType()
case ("decimal" | "dec" | "numeric", Nil) => DecimalType.USER_DEFAULT
case ("decimal" | "dec" | "numeric", precision :: Nil) =>
DecimalType(precision.getText.toInt, 0)
@@ -2689,13 +2692,24 @@ class HoodieSpark3_4ExtendedSqlAstBuilder(conf:
SQLConf, delegate: ParserInterfa
builder.putString("comment", _)
}
+ val dataType = typedVisit[DataType](ctx.dataType)
+
+ addMetadataForType(ctx.dataType(), builder)
+
StructField(
name = colName.getText,
- dataType = typedVisit[DataType](ctx.dataType),
+ dataType = dataType,
nullable = NULL == null,
metadata = builder.build())
}
+ private def addMetadataForType(dataType:
HoodieSqlBaseParser.DataTypeContext, builder: MetadataBuilder): Unit = {
+ val typeText = dataType.getText
+ if (typeText.equalsIgnoreCase(HoodieSchemaType.BLOB.name())) {
+ builder.putString(HoodieSchema.TYPE_METADATA_FIELD,
HoodieSchemaType.BLOB.name())
+ }
+ }
+
/**
* Create a [[StructType]] from a sequence of [[StructField]]s.
*/
@@ -2716,11 +2730,18 @@ class HoodieSpark3_4ExtendedSqlAstBuilder(conf:
SQLConf, delegate: ParserInterfa
*/
override def visitComplexColType(ctx: ComplexColTypeContext): StructField =
withOrigin(ctx) {
import ctx._
- val structField = StructField(
+ val builder = new MetadataBuilder
+ // Add comment to metadata
+ Option(commentSpec()).map(visitCommentSpec).foreach {
+ builder.putString("comment", _)
+ }
+ addMetadataForType(ctx.dataType(), builder)
+
+ StructField(
name = identifier.getText,
dataType = typedVisit(dataType()),
- nullable = NULL == null)
-
Option(commentSpec).map(visitCommentSpec).map(structField.withComment).getOrElse(structField)
+ nullable = NULL == null,
+ metadata = builder.build())
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala
index bb7b255edd81..267e582bf2a1 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala
@@ -128,7 +128,8 @@ class HoodieSpark3_4ExtendedSqlParser(session:
SparkSession, delegate: ParserInt
normalized.contains("create index") ||
normalized.contains("drop index") ||
normalized.contains("show indexes") ||
- normalized.contains("refresh index")
+ normalized.contains("refresh index") ||
+ normalized.contains(" blob")
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlAstBuilder.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlAstBuilder.scala
index c31a3cb3e6f6..653a19984490 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlAstBuilder.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlAstBuilder.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.parser
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
import org.apache.hudi.spark.sql.parser.{HoodieSqlBaseBaseVisitor,
HoodieSqlBaseParser}
import org.apache.hudi.spark.sql.parser.HoodieSqlBaseParser._
@@ -41,6 +42,7 @@ import
org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.connector.expressions.{ApplyTransform,
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference,
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform,
YearsTransform}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.BlobType
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.Utils.isTesting
import org.apache.spark.util.random.RandomSampler
@@ -2607,6 +2609,7 @@ class HoodieSpark3_5ExtendedSqlAstBuilder(conf: SQLConf,
delegate: ParserInterfa
case ("character" | "char", length :: Nil) =>
CharType(length.getText.toInt)
case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
case ("binary", Nil) => BinaryType
+ case ("blob", Nil) => BlobType()
case ("decimal" | "dec" | "numeric", Nil) => DecimalType.USER_DEFAULT
case ("decimal" | "dec" | "numeric", precision :: Nil) =>
DecimalType(precision.getText.toInt, 0)
@@ -2690,13 +2693,24 @@ class HoodieSpark3_5ExtendedSqlAstBuilder(conf:
SQLConf, delegate: ParserInterfa
builder.putString("comment", _)
}
+ val dataType = typedVisit[DataType](ctx.dataType)
+
+ addMetadataForType(ctx.dataType(), builder)
+
StructField(
name = colName.getText,
- dataType = typedVisit[DataType](ctx.dataType),
+ dataType = dataType,
nullable = NULL == null,
metadata = builder.build())
}
+ private def addMetadataForType(dataType:
HoodieSqlBaseParser.DataTypeContext, builder: MetadataBuilder): Unit = {
+ val typeText = dataType.getText
+ if (typeText.equalsIgnoreCase(HoodieSchemaType.BLOB.name())) {
+ builder.putString(HoodieSchema.TYPE_METADATA_FIELD,
HoodieSchemaType.BLOB.name())
+ }
+ }
+
/**
* Create a [[StructType]] from a sequence of [[StructField]]s.
*/
@@ -2717,11 +2731,18 @@ class HoodieSpark3_5ExtendedSqlAstBuilder(conf:
SQLConf, delegate: ParserInterfa
*/
override def visitComplexColType(ctx: ComplexColTypeContext): StructField =
withOrigin(ctx) {
import ctx._
- val structField = StructField(
+ val builder = new MetadataBuilder
+ // Add comment to metadata
+ Option(commentSpec()).map(visitCommentSpec).foreach {
+ builder.putString("comment", _)
+ }
+ addMetadataForType(ctx.dataType(), builder)
+
+ StructField(
name = identifier.getText,
dataType = typedVisit(dataType()),
- nullable = NULL == null)
-
Option(commentSpec).map(visitCommentSpec).map(structField.withComment).getOrElse(structField)
+ nullable = NULL == null,
+ metadata = builder.build())
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlParser.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlParser.scala
index 9aeb29263e75..3c3bc06736cb 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlParser.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlParser.scala
@@ -128,7 +128,8 @@ class HoodieSpark3_5ExtendedSqlParser(session:
SparkSession, delegate: ParserInt
normalized.contains("create index") ||
normalized.contains("drop index") ||
normalized.contains("show indexes") ||
- normalized.contains("refresh index")
+ normalized.contains("refresh index") ||
+ normalized.contains(" blob")
}
}
diff --git
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlAstBuilder.scala
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlAstBuilder.scala
index 5c34062728bc..d8d3faa0dfca 100644
---
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlAstBuilder.scala
+++
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlAstBuilder.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.parser
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
import org.apache.hudi.spark.sql.parser.{HoodieSqlBaseBaseVisitor,
HoodieSqlBaseParser}
import org.apache.hudi.spark.sql.parser.HoodieSqlBaseParser._
@@ -41,6 +42,7 @@ import
org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.connector.expressions.{ApplyTransform,
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference,
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform,
YearsTransform}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.BlobType
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.Utils.isTesting
import org.apache.spark.util.random.RandomSampler
@@ -2609,6 +2611,7 @@ class HoodieSpark4_0ExtendedSqlAstBuilder(conf: SQLConf,
delegate: ParserInterfa
case ("character" | "char", length :: Nil) =>
CharType(length.getText.toInt)
case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
case ("binary", Nil) => BinaryType
+ case ("blob", Nil) => BlobType()
case ("decimal" | "dec" | "numeric", Nil) => DecimalType.USER_DEFAULT
case ("decimal" | "dec" | "numeric", precision :: Nil) =>
DecimalType(precision.getText.toInt, 0)
@@ -2692,13 +2695,24 @@ class HoodieSpark4_0ExtendedSqlAstBuilder(conf:
SQLConf, delegate: ParserInterfa
builder.putString("comment", _)
}
+ val dataType = typedVisit[DataType](ctx.dataType)
+
+ addMetadataForType(ctx.dataType(), builder)
+
StructField(
name = colName.getText,
- dataType = typedVisit[DataType](ctx.dataType),
+ dataType = dataType,
nullable = NULL == null,
metadata = builder.build())
}
+ private def addMetadataForType(dataType:
HoodieSqlBaseParser.DataTypeContext, builder: MetadataBuilder): Unit = {
+ val typeText = dataType.getText
+ if (typeText.equalsIgnoreCase(HoodieSchemaType.BLOB.name())) {
+ builder.putString(HoodieSchema.TYPE_METADATA_FIELD,
HoodieSchemaType.BLOB.name())
+ }
+ }
+
/**
* Create a [[StructType]] from a sequence of [[StructField]]s.
*/
@@ -2719,11 +2733,18 @@ class HoodieSpark4_0ExtendedSqlAstBuilder(conf:
SQLConf, delegate: ParserInterfa
*/
override def visitComplexColType(ctx: ComplexColTypeContext): StructField =
withOrigin(ctx) {
import ctx._
- val structField = StructField(
+ val builder = new MetadataBuilder
+ // Add comment to metadata
+ Option(commentSpec()).map(visitCommentSpec).foreach {
+ builder.putString("comment", _)
+ }
+ addMetadataForType(ctx.dataType(), builder)
+
+ StructField(
name = identifier.getText,
dataType = typedVisit(dataType()),
- nullable = NULL == null)
-
Option(commentSpec).map(visitCommentSpec).map(structField.withComment).getOrElse(structField)
+ nullable = NULL == null,
+ metadata = builder.build())
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlParser.scala
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlParser.scala
index a9e85667ee29..f44002ab0265 100644
---
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlParser.scala
+++
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlParser.scala
@@ -137,7 +137,8 @@ class HoodieSpark4_0ExtendedSqlParser(session:
SparkSession, delegate: ParserInt
normalized.contains("create index") ||
normalized.contains("drop index") ||
normalized.contains("show indexes") ||
- normalized.contains("refresh index")
+ normalized.contains("refresh index") ||
+ normalized.contains(" blob")
}
override def parseRoutineParam(sqlText: String): StructType = throw new
UnsupportedOperationException()