This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 75918eabcb33 feat(blob): Create blobs in Spark SQL (#18347)
75918eabcb33 is described below

commit 75918eabcb33061d81f23a237192d7ab4bdb8c07
Author: Tim Brown <[email protected]>
AuthorDate: Mon Mar 30 23:34:29 2026 -0400

    feat(blob): Create blobs in Spark SQL (#18347)
    
    * add in code and test for handling blob type in SQL table creation
    
    * remove spacing changes
    
    * add basic testing, cleanup constants in HoodieSchema
    
    * make blob type return a constant, add test for blob keyword without blob 
type
    
    * style
    
    * fix style
---
 .../apache/hudi/common/schema/HoodieSchema.java    |   7 +-
 .../avro/AvroSchemaConverterWithTimestampNTZ.java  |   1 +
 .../org/apache/spark/sql/types/BlobType.scala      |  41 ++++++
 .../org/apache/hudi/blob/BlobTestHelpers.scala     | 129 +++++++++++++++++++
 .../org/apache/hudi/blob/TestBlobSupport.scala     | 142 +++++++++++++++++++++
 .../spark/sql/hudi/ddl/TestCreateTable.scala       | 124 ++++++++++++++++++
 .../HoodieSpark3_3ExtendedSqlAstBuilder.scala      |  29 ++++-
 .../parser/HoodieSpark3_3ExtendedSqlParser.scala   |   3 +-
 .../HoodieSpark3_4ExtendedSqlAstBuilder.scala      |  29 ++++-
 .../parser/HoodieSpark3_4ExtendedSqlParser.scala   |   3 +-
 .../HoodieSpark3_5ExtendedSqlAstBuilder.scala      |  29 ++++-
 .../parser/HoodieSpark3_5ExtendedSqlParser.scala   |   3 +-
 .../HoodieSpark4_0ExtendedSqlAstBuilder.scala      |  29 ++++-
 .../parser/HoodieSpark4_0ExtendedSqlParser.scala   |   3 +-
 14 files changed, 550 insertions(+), 22 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index 01a0963a6623..e20aeea24911 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -2548,7 +2548,10 @@ public class HoodieSchema implements Serializable {
     public static final String TYPE_DESCRIPTOR = "BLOB";
     private static final String DEFAULT_NAME = "blob";
     private static final List<Schema.Field> BLOB_FIELDS = createBlobFields();
+    private static final int REFERENCE_FIELD_COUNT = 
AvroSchemaUtils.getNonNullTypeFromUnion(BLOB_FIELDS.get(2).schema()).getFields().size();
 
+    public static final String INLINE = "INLINE";
+    public static final String OUT_OF_LINE = "OUT_OF_LINE";
     public static final String TYPE = "type";
     public static final String INLINE_DATA_FIELD = "data";
     public static final String EXTERNAL_REFERENCE = "reference";
@@ -2564,7 +2567,7 @@ public class HoodieSchema implements Serializable {
     }
 
     public static int getReferenceFieldCount() {
-      return 
AvroSchemaUtils.getNonNullTypeFromUnion(BLOB_FIELDS.get(2).schema()).getFields().size();
+      return REFERENCE_FIELD_COUNT;
     }
 
     /**
@@ -2622,7 +2625,7 @@ public class HoodieSchema implements Serializable {
       referenceField.setFields(referenceFields);
 
       return Arrays.asList(
-          new Schema.Field(TYPE, Schema.createEnum("blob_storage_type", null, 
null, Arrays.asList("INLINE", "OUT_OF_LINE")), null, null),
+          new Schema.Field(TYPE, Schema.createEnum("blob_storage_type", null, 
null, Arrays.asList(INLINE, OUT_OF_LINE)), null, null),
           new Schema.Field(INLINE_DATA_FIELD, 
AvroSchemaUtils.createNullableSchema(bytesField), null, 
Schema.Field.NULL_DEFAULT_VALUE),
           new Schema.Field(EXTERNAL_REFERENCE, 
AvroSchemaUtils.createNullableSchema(referenceField), null, 
Schema.Field.NULL_DEFAULT_VALUE)
       );
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
 
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
index 6641e70cbaa3..8895186fd055 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
@@ -221,6 +221,7 @@ public class AvroSchemaConverterWithTimestampNTZ extends 
HoodieAvroParquetSchema
         }
         break;
       case RECORD:
+      case BLOB:
         return new GroupType(repetition, fieldName, 
convertFields(schema.getFields(), schemaPath));
       case ENUM:
         builder = Types.primitive(BINARY, repetition).as(enumType());
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/types/BlobType.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/types/BlobType.scala
new file mode 100644
index 000000000000..9be8e9b7a008
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/types/BlobType.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.apache.hudi.common.schema.HoodieSchema
+
+import org.apache.spark.sql.avro.HoodieSparkSchemaConverters
+
+/**
+ * Factory object for creating Spark StructType representation of {@link 
HoodieSchema.Blob}.
+ *
+ * BLOB is a logical type in Hudi that represents binary large objects. The 
data can be
+ * stored inline (as bytes) or out-of-line (as a reference to a file location).
+ */
+object BlobType {
+  val dataType: DataType = 
HoodieSparkSchemaConverters.toSqlType(HoodieSchema.createBlob())._1
+
+  /**
+   * Creates a StructType representing a {@link HoodieSchema.Blob}.
+   *
+   * @return StructType with blob structure
+   */
+  def apply(): DataType = dataType
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/BlobTestHelpers.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/BlobTestHelpers.scala
new file mode 100644
index 000000000000..8783422c51b4
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/BlobTestHelpers.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.blob
+
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+
+import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.functions.{lit, struct}
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+
+import java.io.File
+import java.nio.file.{Files, Path}
+
+object BlobTestHelpers {
+  def blobMetadata: Metadata = {
+    new MetadataBuilder()
+      .putString(HoodieSchema.TYPE_METADATA_FIELD, 
HoodieSchemaType.BLOB.name())
+      .build()
+  }
+
+  def inlineBlobStructCol(name: String, bytesCol: Column): Column = {
+    struct(
+      lit(HoodieSchema.Blob.INLINE).as(HoodieSchema.Blob.TYPE),
+      bytesCol.as(HoodieSchema.Blob.INLINE_DATA_FIELD),
+      
lit(null).cast("struct<externalPath:string,offset:bigint,length:bigint,managed:boolean>")
+        .as(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+    ).as(name, blobMetadata)
+  }
+
+  def wholeFileBlobStructCol(name: String, filePathCol: Column): Column = {
+    struct(
+      lit(HoodieSchema.Blob.OUT_OF_LINE).as(HoodieSchema.Blob.TYPE),
+      lit(null).cast("binary").as(HoodieSchema.Blob.INLINE_DATA_FIELD),
+      struct(
+        filePathCol.as(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH),
+        
lit(null).cast("bigint").as(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET),
+        
lit(null).cast("bigint").as(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH),
+        lit(false).as(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED)
+      ).as(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+    ).as(name, blobMetadata)
+  }
+
+  def blobStructCol(name: String, filePathCol: Column, offsetCol: Column, 
lengthCol: Column): Column = {
+    struct(
+      lit(HoodieSchema.Blob.OUT_OF_LINE).as(HoodieSchema.Blob.TYPE),
+      lit(null).cast("binary").as(HoodieSchema.Blob.INLINE_DATA_FIELD),
+      struct(
+        filePathCol.as(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH),
+        offsetCol.as(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET),
+        lengthCol.as(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH),
+        lit(false).as(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED)
+      ).as(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+    ).as(name, blobMetadata)
+  }
+
+  def createTestFile(tempDir: Path, name: String, size: Int): String = {
+    val file = new File(tempDir.toString, name)
+    val bytes = (0 until size).map(i => (i % 256).toByte).toArray
+    Files.write(file.toPath, bytes)
+    file.getAbsolutePath
+  }
+
+  /**
+   * Assert that byte array contains expected pattern (i % 256) at given 
offset.
+   *
+   * @param data Array of bytes to verify
+   * @param expectedOffset Starting offset for pattern (default 0)
+   */
+  def assertBytesContent(data: Array[Byte], expectedOffset: Int = 0): Unit = {
+    for (i <- 0 until data.length) {
+      assertEquals((expectedOffset + i) % 256, data(i) & 0xFF,
+        s"Mismatch at byte $i (global offset ${expectedOffset + i})")
+    }
+  }
+
+  /**
+   * Execute code block with temporary Spark configuration.
+   * Automatically restores previous values after execution.
+   *
+   * @param spark SparkSession instance
+   * @param configs Configuration key-value pairs to set
+   * @param fn Code block to execute with configs
+   */
+  def withSparkConfig[T](spark: SparkSession, configs: Map[String, 
String])(fn: => T): T = {
+    val oldValues = configs.keys.map(k => (k, spark.conf.getOption(k))).toMap
+    try {
+      configs.foreach { case (k, v) => spark.conf.set(k, v) }
+      fn
+    } finally {
+      oldValues.foreach { case (k, oldValue) =>
+        oldValue match {
+          case Some(v) => spark.conf.set(k, v)
+          case None => spark.conf.unset(k)
+        }
+      }
+    }
+  }
+
+  /**
+   * Assert that DataFrame contains all specified columns.
+   *
+   * @param df DataFrame to check
+   * @param columnNames Variable number of column names
+   */
+  def assertColumnsExist(df: DataFrame, columnNames: String*): Unit = {
+    columnNames.foreach { colName =>
+      assertTrue(df.columns.contains(colName),
+        s"DataFrame missing expected column: $colName. Available: 
${df.columns.mkString(", ")}")
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestBlobSupport.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestBlobSupport.scala
new file mode 100644
index 000000000000..22e5eee77773
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestBlobSupport.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.blob
+
+import org.apache.hudi.SparkDatasetMixin
+import org.apache.hudi.blob.BlobTestHelpers._
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, 
HoodieFileFormat, HoodieKey, HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaField, 
HoodieSchemaType}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.config.HoodieIndexConfig
+import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.testutils.HoodieClientTestBase
+
+import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import java.util.{Arrays, Properties}
+
+import scala.collection.JavaConverters._
+
+/**
+ * End-to-end test for blob support including schema creation, multi-commit 
writes,
+ * DataFrame reads, and SQL blob retrieval.
+ *
+ * This test validates:
+ * <ul>
+ *   <li>Creating tables with blob columns</li>
+ *   <li>Writing blob records with out-of-line references</li>
+ *   <li>Multi-commit operations (insert + upsert)</li>
+ *   <li>Reading blob references via DataFrame</li>
+ *   <li>SQL read_blob() function integration</li>
+ * </ul>
+ */
+class TestBlobSupport extends HoodieClientTestBase with SparkDatasetMixin {
+  val SCHEMA: HoodieSchema = HoodieSchema.createRecord("test_blobs", null, 
null, Arrays.asList(
+    HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.STRING), 
null, null),
+    HoodieSchemaField.of("value", HoodieSchema.create(HoodieSchemaType.INT), 
null, null),
+    HoodieSchemaField.of("data", HoodieSchema.createBlob(), null, null)
+  ))
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testEndToEnd(tableType: HoodieTableType): Unit = {
+    val filePath1 = createTestFile(tempDir, "file1.bin", 1000)
+    val filePath2 = createTestFile(tempDir, "file2.bin", 1000)
+
+    val properties = new Properties()
+    properties.put("hoodie.datasource.write.recordkey.field", "id")
+    properties.put("hoodie.datasource.write.partitionpath.field", "")
+    properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "id")
+    properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "")
+    properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), 
HoodieFileFormat.PARQUET.toString)
+
+    // Initialize table
+    HoodieTableMetaClient.newTableBuilder()
+      .setTableName("test_blob_table")
+      .setTableType(tableType)
+      .fromProperties(properties)
+      .initTable(storageConf, basePath)
+
+    var client: SparkRDDWriteClient[IndexedRecord] = null
+    val config = getConfigBuilder(SCHEMA.toString())
+      
.withIndexConfig(HoodieIndexConfig.newBuilder.withIndexType(HoodieIndex.IndexType.SIMPLE).build)
+      .build()
+    try {
+      client = 
getHoodieWriteClient(config).asInstanceOf[SparkRDDWriteClient[IndexedRecord]]
+
+      // First commit - insert
+      val commit1 = client.startCommit()
+      val firstBatch = createTestRecords(filePath1)
+      val statuses1 = client.insert(jsc.parallelize(firstBatch.asJava, 1), 
commit1).collect()
+      client.commit(commit1, jsc.parallelize(statuses1, 1))
+
+      // Second commit - upsert
+      val commit2 = client.startCommit()
+      val secondBatch = createTestRecords(filePath2)
+      val statuses2 = client.upsert(jsc.parallelize(secondBatch.asJava, 1), 
commit2).collect()
+      client.commit(commit2, jsc.parallelize(statuses2, 1))
+    } finally {
+      if (client != null) client.close()
+    }
+
+    // Read and verify DataFrame results
+    val table = sparkSession.read.format("hudi").load(basePath)
+    val rows = table.collectAsList()
+    assertEquals(10, rows.size())
+
+    rows.asScala.foreach { row =>
+      val data = row.getStruct(row.fieldIndex("data"))
+      val reference = 
data.getStruct(data.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE))
+      val filePath = 
reference.getString(reference.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH))
+      assertTrue(filePath.endsWith("file2.bin"))
+    }
+  }
+
+  private def createTestRecords(filePath: String): 
Seq[HoodieRecord[IndexedRecord]] = {
+    (0 until 10).map { i =>
+      val id = s"id_$i"
+      val key = new HoodieKey(id, "")
+
+      val dataSchema = SCHEMA.getField("data").get.schema
+      val fileReference = new 
GenericData.Record(dataSchema.getField(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+        .get.getNonNullSchema.toAvroSchema)
+      fileReference.put(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH, filePath)
+      fileReference.put(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET, i * 100L)
+      fileReference.put(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH, 100L)
+      fileReference.put(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED, false)
+
+      val blobRecord = new GenericData.Record(dataSchema.toAvroSchema)
+      blobRecord.put(HoodieSchema.Blob.TYPE, new 
GenericData.EnumSymbol(dataSchema.getField(HoodieSchema.Blob.TYPE)
+        .get.schema.toAvroSchema, HoodieSchema.Blob.OUT_OF_LINE))
+      blobRecord.put(HoodieSchema.Blob.EXTERNAL_REFERENCE, fileReference)
+
+      val record = new GenericData.Record(SCHEMA.toAvroSchema)
+      record.put("id", id)
+      record.put("value", i)
+      record.put("data", blobRecord)
+
+      new HoodieAvroIndexedRecord(key, record)
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
index cf29c75edaca..5cd5dbb0d54b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.ddl
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, 
WriteOperationType}
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName
 import org.apache.hudi.config.HoodieWriteConfig
@@ -1996,4 +1997,127 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
     HoodieSparkSqlTestBase.enableComplexKeygenValidation(spark, tableName)
     checkAnswer(query)(expectedRowsAfter: _*)
   }
+
+  test("test create table with BLOB column") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |CREATE TABLE $tableName (
+           |  id BIGINT,
+           |  video BLOB COMMENT 'Product demonstration video'
+           |) USING hudi
+           |LOCATION '${tmp.getCanonicalPath}'
+           |TBLPROPERTIES (
+           |  primaryKey = 'id'
+           |)
+           """.stripMargin)
+
+      // Verify schema has hudi_blob metadata
+      val schema = spark.table(tableName).schema
+      val videoField = schema.find(_.name == "video").get
+      
assertTrue(videoField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+      assertEquals(HoodieSchemaType.BLOB.name(), 
videoField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+      assertEquals("Product demonstration video", 
videoField.metadata.getString("comment"))
+
+      // Verify structure matches blob schema
+      assertTrue(videoField.dataType.isInstanceOf[StructType])
+      assertEquals(BlobType(), videoField.dataType)
+    }
+  }
+
+  test("test create table with multiple BLOB columns") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |CREATE TABLE $tableName (
+           |  id BIGINT,
+           |  video BLOB,
+           |  thumbnail blob,
+           |  metadata MAP<STRING, STRING>,
+           |  audio BLOB NOT NULL
+           |) USING hudi
+           |LOCATION '${tmp.getCanonicalPath}'
+           |TBLPROPERTIES (
+           |  primaryKey = 'id'
+           |)
+           """.stripMargin)
+
+      val schema = spark.table(tableName).schema
+
+      // Verify all BLOB columns have the metadata
+      val blobColumns = Seq("video", "thumbnail", "audio")
+      blobColumns.foreach { colName =>
+        val field = schema.find(_.name == colName).get
+        assertTrue(field.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+        assertEquals(HoodieSchemaType.BLOB.name(), 
field.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+        assertTrue(field.dataType.isInstanceOf[StructType])
+
+        if (colName == "audio") {
+          assertFalse(field.nullable)
+        } else {
+          assertTrue(field.nullable)
+        }
+
+        val blobStruct = field.dataType.asInstanceOf[StructType]
+        assertEquals(BlobType(), blobStruct)
+      }
+    }
+  }
+
+  test("test BLOB in nested struct") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |CREATE TABLE $tableName (
+           |  id BIGINT,
+           |  media STRUCT<title: STRING, content: BLOB>
+           |) USING hudi
+           |LOCATION '${tmp.getCanonicalPath}'
+           |TBLPROPERTIES (
+           |  primaryKey = 'id'
+           |)
+           """.stripMargin)
+
+      val schema = spark.table(tableName).schema
+      val mediaField = schema.find(_.name == "media").get
+      assertTrue(mediaField.dataType.isInstanceOf[StructType])
+
+      val mediaStruct = mediaField.dataType.asInstanceOf[StructType]
+      val contentField = mediaStruct.find(_.name == "content").get
+
+      // Verify nested BLOB has metadata
+      
assertTrue(contentField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+      assertEquals(HoodieSchemaType.BLOB.name(), 
contentField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+
+      // Verify structure
+      assertTrue(contentField.dataType.isInstanceOf[StructType])
+      val blobStruct = contentField.dataType.asInstanceOf[StructType]
+      assertEquals(BlobType(), blobStruct)
+    }
+  }
+
+  test("test create field with blob as name but not type") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |CREATE TABLE $tableName (
+           |  id BIGINT,
+           |  blob_path STRING
+           |) USING hudi
+           |LOCATION '${tmp.getCanonicalPath}'
+           |TBLPROPERTIES (
+           |  primaryKey = 'id'
+           |)
+           """.stripMargin)
+
+      // ensure that blob in the name does not cause any regressions in parsing
+      val schema = spark.table(tableName).schema
+      val blobPathField = schema.find(_.name == "blob_path").get
+      assertTrue(blobPathField.dataType.isInstanceOf[StringType])
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala
index ce51cfab5b1b..ec357ccb7b1a 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.parser
 
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
 import org.apache.hudi.spark.sql.parser.{HoodieSqlBaseBaseVisitor, 
HoodieSqlBaseParser}
 import org.apache.hudi.spark.sql.parser.HoodieSqlBaseParser._
 
@@ -40,6 +41,7 @@ import 
org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, 
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, 
YearsTransform}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.BlobType
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 import org.apache.spark.util.Utils.isTesting
 import org.apache.spark.util.random.RandomSampler
@@ -2603,6 +2605,7 @@ class HoodieSpark3_3ExtendedSqlAstBuilder(conf: SQLConf, 
delegate: ParserInterfa
       case ("character" | "char", length :: Nil) => 
CharType(length.getText.toInt)
       case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
       case ("binary", Nil) => BinaryType
+      case ("blob", Nil) => BlobType()
       case ("decimal" | "dec" | "numeric", Nil) => DecimalType.USER_DEFAULT
       case ("decimal" | "dec" | "numeric", precision :: Nil) =>
         DecimalType(precision.getText.toInt, 0)
@@ -2686,13 +2689,24 @@ class HoodieSpark3_3ExtendedSqlAstBuilder(conf: 
SQLConf, delegate: ParserInterfa
       builder.putString("comment", _)
     }
 
+    val dataType = typedVisit[DataType](ctx.dataType)
+
+    addMetadataForType(ctx.dataType(), builder)
+
     StructField(
       name = colName.getText,
-      dataType = typedVisit[DataType](ctx.dataType),
+      dataType = dataType,
       nullable = NULL == null,
       metadata = builder.build())
   }
 
+  private def addMetadataForType(dataType: 
HoodieSqlBaseParser.DataTypeContext, builder: MetadataBuilder): Unit = {
+    val typeText = dataType.getText
+    if (typeText.equalsIgnoreCase(HoodieSchemaType.BLOB.name())) {
+      builder.putString(HoodieSchema.TYPE_METADATA_FIELD, 
HoodieSchemaType.BLOB.name())
+    }
+  }
+
   /**
    * Create a [[StructType]] from a sequence of [[StructField]]s.
    */
@@ -2713,11 +2727,18 @@ class HoodieSpark3_3ExtendedSqlAstBuilder(conf: 
SQLConf, delegate: ParserInterfa
    */
   override def visitComplexColType(ctx: ComplexColTypeContext): StructField = 
withOrigin(ctx) {
     import ctx._
-    val structField = StructField(
+    val builder = new MetadataBuilder
+    // Add comment to metadata
+    Option(commentSpec()).map(visitCommentSpec).foreach {
+      builder.putString("comment", _)
+    }
+    addMetadataForType(ctx.dataType(), builder)
+
+    StructField(
       name = identifier.getText,
       dataType = typedVisit(dataType()),
-      nullable = NULL == null)
-    
Option(commentSpec).map(visitCommentSpec).map(structField.withComment).getOrElse(structField)
+      nullable = NULL == null,
+      metadata = builder.build())
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala
index 4b98a0fd11dc..e10df36d2faf 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala
@@ -128,7 +128,8 @@ class HoodieSpark3_3ExtendedSqlParser(session: 
SparkSession, delegate: ParserInt
       normalized.contains("create index") ||
       normalized.contains("drop index") ||
       normalized.contains("show indexes") ||
-      normalized.contains("refresh index")
+      normalized.contains("refresh index") ||
+      normalized.contains(" blob")
   }
 }
 
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala
index ba62f1c719cc..fb995babe70d 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.parser
 
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
 import org.apache.hudi.spark.sql.parser.{HoodieSqlBaseBaseVisitor, 
HoodieSqlBaseParser}
 import org.apache.hudi.spark.sql.parser.HoodieSqlBaseParser._
 
@@ -40,6 +41,7 @@ import 
org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, 
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, 
YearsTransform}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.BlobType
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 import org.apache.spark.util.Utils.isTesting
 import org.apache.spark.util.random.RandomSampler
@@ -2606,6 +2608,7 @@ class HoodieSpark3_4ExtendedSqlAstBuilder(conf: SQLConf, 
delegate: ParserInterfa
       case ("character" | "char", length :: Nil) => 
CharType(length.getText.toInt)
       case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
       case ("binary", Nil) => BinaryType
+      case ("blob", Nil) => BlobType()
       case ("decimal" | "dec" | "numeric", Nil) => DecimalType.USER_DEFAULT
       case ("decimal" | "dec" | "numeric", precision :: Nil) =>
         DecimalType(precision.getText.toInt, 0)
@@ -2689,13 +2692,24 @@ class HoodieSpark3_4ExtendedSqlAstBuilder(conf: 
SQLConf, delegate: ParserInterfa
       builder.putString("comment", _)
     }
 
+    val dataType = typedVisit[DataType](ctx.dataType)
+
+    addMetadataForType(ctx.dataType(), builder)
+
     StructField(
       name = colName.getText,
-      dataType = typedVisit[DataType](ctx.dataType),
+      dataType = dataType,
       nullable = NULL == null,
       metadata = builder.build())
   }
 
+  private def addMetadataForType(dataType: 
HoodieSqlBaseParser.DataTypeContext, builder: MetadataBuilder): Unit = {
+    val typeText = dataType.getText
+    if (typeText.equalsIgnoreCase(HoodieSchemaType.BLOB.name())) {
+      builder.putString(HoodieSchema.TYPE_METADATA_FIELD, 
HoodieSchemaType.BLOB.name())
+    }
+  }
+
   /**
    * Create a [[StructType]] from a sequence of [[StructField]]s.
    */
@@ -2716,11 +2730,18 @@ class HoodieSpark3_4ExtendedSqlAstBuilder(conf: 
SQLConf, delegate: ParserInterfa
    */
   override def visitComplexColType(ctx: ComplexColTypeContext): StructField = 
withOrigin(ctx) {
     import ctx._
-    val structField = StructField(
+    val builder = new MetadataBuilder
+    // Add comment to metadata
+    Option(commentSpec()).map(visitCommentSpec).foreach {
+      builder.putString("comment", _)
+    }
+    addMetadataForType(ctx.dataType(), builder)
+
+    StructField(
       name = identifier.getText,
       dataType = typedVisit(dataType()),
-      nullable = NULL == null)
-    
Option(commentSpec).map(visitCommentSpec).map(structField.withComment).getOrElse(structField)
+      nullable = NULL == null,
+      metadata = builder.build())
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala
index bb7b255edd81..267e582bf2a1 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala
@@ -128,7 +128,8 @@ class HoodieSpark3_4ExtendedSqlParser(session: 
SparkSession, delegate: ParserInt
       normalized.contains("create index") ||
       normalized.contains("drop index") ||
       normalized.contains("show indexes") ||
-      normalized.contains("refresh index")
+      normalized.contains("refresh index") ||
+      normalized.contains(" blob")
   }
 }
 
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlAstBuilder.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlAstBuilder.scala
index c31a3cb3e6f6..653a19984490 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlAstBuilder.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlAstBuilder.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.parser
 
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
 import org.apache.hudi.spark.sql.parser.{HoodieSqlBaseBaseVisitor, 
HoodieSqlBaseParser}
 import org.apache.hudi.spark.sql.parser.HoodieSqlBaseParser._
 
@@ -41,6 +42,7 @@ import 
org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, 
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, 
YearsTransform}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.BlobType
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 import org.apache.spark.util.Utils.isTesting
 import org.apache.spark.util.random.RandomSampler
@@ -2607,6 +2609,7 @@ class HoodieSpark3_5ExtendedSqlAstBuilder(conf: SQLConf, 
delegate: ParserInterfa
       case ("character" | "char", length :: Nil) => 
CharType(length.getText.toInt)
       case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
       case ("binary", Nil) => BinaryType
+      case ("blob", Nil) => BlobType()
       case ("decimal" | "dec" | "numeric", Nil) => DecimalType.USER_DEFAULT
       case ("decimal" | "dec" | "numeric", precision :: Nil) =>
         DecimalType(precision.getText.toInt, 0)
@@ -2690,13 +2693,24 @@ class HoodieSpark3_5ExtendedSqlAstBuilder(conf: 
SQLConf, delegate: ParserInterfa
       builder.putString("comment", _)
     }
 
+    val dataType = typedVisit[DataType](ctx.dataType)
+
+    addMetadataForType(ctx.dataType(), builder)
+
     StructField(
       name = colName.getText,
-      dataType = typedVisit[DataType](ctx.dataType),
+      dataType = dataType,
       nullable = NULL == null,
       metadata = builder.build())
   }
 
+  private def addMetadataForType(dataType: 
HoodieSqlBaseParser.DataTypeContext, builder: MetadataBuilder): Unit = {
+    val typeText = dataType.getText
+    if (typeText.equalsIgnoreCase(HoodieSchemaType.BLOB.name())) {
+      builder.putString(HoodieSchema.TYPE_METADATA_FIELD, 
HoodieSchemaType.BLOB.name())
+    }
+  }
+
   /**
    * Create a [[StructType]] from a sequence of [[StructField]]s.
    */
@@ -2717,11 +2731,18 @@ class HoodieSpark3_5ExtendedSqlAstBuilder(conf: 
SQLConf, delegate: ParserInterfa
    */
   override def visitComplexColType(ctx: ComplexColTypeContext): StructField = 
withOrigin(ctx) {
     import ctx._
-    val structField = StructField(
+    val builder = new MetadataBuilder
+    // Add comment to metadata
+    Option(commentSpec()).map(visitCommentSpec).foreach {
+      builder.putString("comment", _)
+    }
+    addMetadataForType(ctx.dataType(), builder)
+
+    StructField(
       name = identifier.getText,
       dataType = typedVisit(dataType()),
-      nullable = NULL == null)
-    
Option(commentSpec).map(visitCommentSpec).map(structField.withComment).getOrElse(structField)
+      nullable = NULL == null,
+      metadata = builder.build())
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlParser.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlParser.scala
index 9aeb29263e75..3c3bc06736cb 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlParser.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_5ExtendedSqlParser.scala
@@ -128,7 +128,8 @@ class HoodieSpark3_5ExtendedSqlParser(session: 
SparkSession, delegate: ParserInt
       normalized.contains("create index") ||
       normalized.contains("drop index") ||
       normalized.contains("show indexes") ||
-      normalized.contains("refresh index")
+      normalized.contains("refresh index") ||
+      normalized.contains(" blob")
   }
 }
 
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlAstBuilder.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlAstBuilder.scala
index 5c34062728bc..d8d3faa0dfca 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlAstBuilder.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlAstBuilder.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.parser
 
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
 import org.apache.hudi.spark.sql.parser.{HoodieSqlBaseBaseVisitor, 
HoodieSqlBaseParser}
 import org.apache.hudi.spark.sql.parser.HoodieSqlBaseParser._
 
@@ -41,6 +42,7 @@ import 
org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, 
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, 
YearsTransform}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.BlobType
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 import org.apache.spark.util.Utils.isTesting
 import org.apache.spark.util.random.RandomSampler
@@ -2609,6 +2611,7 @@ class HoodieSpark4_0ExtendedSqlAstBuilder(conf: SQLConf, 
delegate: ParserInterfa
       case ("character" | "char", length :: Nil) => 
CharType(length.getText.toInt)
       case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
       case ("binary", Nil) => BinaryType
+      case ("blob", Nil) => BlobType()
       case ("decimal" | "dec" | "numeric", Nil) => DecimalType.USER_DEFAULT
       case ("decimal" | "dec" | "numeric", precision :: Nil) =>
         DecimalType(precision.getText.toInt, 0)
@@ -2692,13 +2695,24 @@ class HoodieSpark4_0ExtendedSqlAstBuilder(conf: 
SQLConf, delegate: ParserInterfa
       builder.putString("comment", _)
     }
 
+    val dataType = typedVisit[DataType](ctx.dataType)
+
+    addMetadataForType(ctx.dataType(), builder)
+
     StructField(
       name = colName.getText,
-      dataType = typedVisit[DataType](ctx.dataType),
+      dataType = dataType,
       nullable = NULL == null,
       metadata = builder.build())
   }
 
+  private def addMetadataForType(dataType: 
HoodieSqlBaseParser.DataTypeContext, builder: MetadataBuilder): Unit = {
+    val typeText = dataType.getText
+    if (typeText.equalsIgnoreCase(HoodieSchemaType.BLOB.name())) {
+      builder.putString(HoodieSchema.TYPE_METADATA_FIELD, 
HoodieSchemaType.BLOB.name())
+    }
+  }
+
   /**
    * Create a [[StructType]] from a sequence of [[StructField]]s.
    */
@@ -2719,11 +2733,18 @@ class HoodieSpark4_0ExtendedSqlAstBuilder(conf: 
SQLConf, delegate: ParserInterfa
    */
   override def visitComplexColType(ctx: ComplexColTypeContext): StructField = 
withOrigin(ctx) {
     import ctx._
-    val structField = StructField(
+    val builder = new MetadataBuilder
+    // Add comment to metadata
+    Option(commentSpec()).map(visitCommentSpec).foreach {
+      builder.putString("comment", _)
+    }
+    addMetadataForType(ctx.dataType(), builder)
+
+    StructField(
       name = identifier.getText,
       dataType = typedVisit(dataType()),
-      nullable = NULL == null)
-    
Option(commentSpec).map(visitCommentSpec).map(structField.withComment).getOrElse(structField)
+      nullable = NULL == null,
+      metadata = builder.build())
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlParser.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlParser.scala
index a9e85667ee29..f44002ab0265 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlParser.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_0ExtendedSqlParser.scala
@@ -137,7 +137,8 @@ class HoodieSpark4_0ExtendedSqlParser(session: 
SparkSession, delegate: ParserInt
       normalized.contains("create index") ||
       normalized.contains("drop index") ||
       normalized.contains("show indexes") ||
-      normalized.contains("refresh index")
+      normalized.contains("refresh index") ||
+      normalized.contains(" blob")
   }
 
   override def parseRoutineParam(sqlText: String): StructType = throw new 
UnsupportedOperationException()


Reply via email to