This is an automated email from the ASF dual-hosted git repository.

voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 34e9c7c5bbdc test(schema): Add MOR log-only compaction tests for 
custom types (#18583)
34e9c7c5bbdc is described below

commit 34e9c7c5bbdc30143a3f2dbb6931149f6350357f
Author: voonhous <[email protected]>
AuthorDate: Thu May 7 19:05:05 2026 +0800

    test(schema): Add MOR log-only compaction tests for custom types (#18583)
    
    * test(schema): Add MOR log-only compaction tests for custom types
    
    Cover the invariant that the HoodieSchema.TYPE_METADATA_FIELD descriptor
    and payload shape of a custom-typed column survive inline compaction of
    a log-only MOR table into a base file.
    
    - TestVectorDataSource: add testMorLogOnlyCompactionPreservesVectorMetadata
      (5 commits via SQL + MERGE INTO to trigger default inline compaction).
    - TestVariantDataType: equivalent VARIANT test, gated on Spark 4.0+,
      asserting native VariantType round-trips through compaction.
    - TestBlobDataType (new): BLOB INLINE and BLOB OUT_OF_LINE cases. Inline
      uses named_struct with hex byte literals; out-of-line creates real files
      via BlobTestHelpers.createTestFile and verifies bytes via read_blob().
    
    * test(schema): Address review comments on MOR log-only compaction tests
    
    - Pin hoodie.compact.inline.max.delta.commits = '5' on all 4 tables so
      compaction triggers deterministically rather than via the implicit
      default
    - Rename path to externalPath in outOfLineBlobLiteral
    - Fail with the missing id in embeddingOf instead of a bare .get
    - Extract val tablePath in the variant test for consistency
---
 .../hudi/functional/TestVectorDataSource.scala     | 129 +++++++++-
 .../sql/hudi/dml/schema/TestBlobDataType.scala     | 285 +++++++++++++++++++++
 .../sql/hudi/dml/schema/TestVariantDataType.scala  | 117 +++++++++
 3 files changed, 530 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
index 81eb776b5482..daf0c8a6c19b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
@@ -19,7 +19,8 @@ package org.apache.hudi.functional
 
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
-import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.testutils.{DataSourceTestUtils, 
HoodieSparkClientTestBase}
 
 import org.apache.hadoop.fs.Path
 import org.apache.parquet.hadoop.ParquetFileReader
@@ -674,6 +675,132 @@ class TestVectorDataSource extends 
HoodieSparkClientTestBase {
     assertTrue(r7.getSeq[Double](1).forall(_ == 1.0), "key_7 should have 
original value 1.0")
   }
 
+  @Test
+  def testMorLogOnlyCompactionPreservesVectorMetadata(): Unit = {
+    val path = basePath + "/mor_log_only_vec"
+    val tableName = "mor_log_only_vec_test"
+    try {
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  embedding VECTOR(3),
+           |  ts long
+           |) using hudi
+           | location '$path'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  type = 'mor',
+           |  preCombineField = 'ts',
+           |  hoodie.index.type = 'INMEMORY',
+           |  hoodie.compact.inline = 'true',
+           |  hoodie.compact.inline.max.delta.commits = '5',
+           |  hoodie.clean.commits.retained = '1'
+           | )
+       """.stripMargin)
+
+      def readOrdered(): Seq[Row] =
+        spark.sql(s"select id, embedding, ts from $tableName order by 
id").collect().toSeq
+
+      def embeddingOf(id: Int, rows: Seq[Row]): Seq[Float] =
+        rows.find(_.getInt(0) == id)
+          .getOrElse(fail(s"No row with id=$id"))
+          .getSeq[Float](1)
+
+      spark.sql(
+        s"insert into $tableName values " +
+          "(1, array(cast(0.1 as float), cast(0.2 as float), cast(0.3 as 
float)), 1000)")
+      spark.sql(
+        s"insert into $tableName values " +
+          "(2, array(cast(0.4 as float), cast(0.5 as float), cast(0.6 as 
float)), 1000)")
+      spark.sql(
+        s"insert into $tableName values " +
+          "(3, array(cast(0.7 as float), cast(0.8 as float), cast(0.9 as 
float)), 1000)")
+      // 3 commits will not trigger compaction, so it should be log only.
+      assertTrue(DataSourceTestUtils.isLogFileOnly(path))
+      val afterInserts = readOrdered()
+      assertEquals(3, afterInserts.size)
+      assertEquals(Seq(0.1f, 0.2f, 0.3f), embeddingOf(1, afterInserts))
+      assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterInserts))
+      assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterInserts))
+
+      spark.sql(
+        s"""
+           |merge into $tableName h0
+           |using (
+           |  select 1 as id,
+           |         array(cast(0.11 as float), cast(0.22 as float), cast(0.33 
as float)) as embedding,
+           |         1001L as ts
+           |) s0
+           | on h0.id = s0.id
+           | when matched then update set *
+           |""".stripMargin)
+      // 4 commits will not trigger compaction, so it should be log only.
+      assertTrue(DataSourceTestUtils.isLogFileOnly(path))
+      val afterUpdate = readOrdered()
+      assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterUpdate))
+      assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterUpdate))
+      assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterUpdate))
+
+      spark.sql(
+        s"""
+           |merge into $tableName h0
+           |using (
+           |  select 4 as id,
+           |         array(cast(0.44 as float), cast(0.55 as float), cast(0.66 
as float)) as embedding,
+           |         1000L as ts
+           |) s0
+           | on h0.id = s0.id
+           | when not matched then insert *
+           |""".stripMargin)
+
+      // 5 commits will trigger compaction.
+      assertFalse(DataSourceTestUtils.isLogFileOnly(path))
+      val afterCompaction = readOrdered()
+      assertEquals(4, afterCompaction.size)
+      assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterCompaction))
+      assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterCompaction))
+      assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterCompaction))
+      assertEquals(Seq(0.44f, 0.55f, 0.66f), embeddingOf(4, afterCompaction))
+
+      // VECTOR custom-type descriptor must survive the compacted base-file 
read path.
+      val embeddingField = spark.table(tableName).schema.find(_.name == 
"embedding").get
+      
assertTrue(embeddingField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD),
+        s"Expected VECTOR type metadata on embedding field after compaction, " 
+
+          s"got: ${embeddingField.metadata}")
+
+      // 6th commit drives an auto-clean that retires the now-superseded 
log-only slice.
+      // Inline compaction on commit 5 ran AFTER its own postCommit clean, so 
the prior
+      // slice was not yet superseded when that clean fired and no .clean 
instant was
+      // written. This deltacommit's postCommit clean sees the post-compaction 
base
+      // file and writes the .clean instant.
+      spark.sql(
+        s"""
+           |merge into $tableName h0
+           |using (
+           |  select 2 as id,
+           |         array(cast(0.222 as float), cast(0.555 as float), 
cast(0.888 as float)) as embedding,
+           |         1002L as ts
+           |) s0
+           | on h0.id = s0.id
+           | when matched then update set *
+           |""".stripMargin)
+      val afterCleanup = readOrdered()
+      assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterCleanup))
+      assertEquals(Seq(0.222f, 0.555f, 0.888f), embeddingOf(2, afterCleanup))
+      assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterCleanup))
+      assertEquals(Seq(0.44f, 0.55f, 0.66f), embeddingOf(4, afterCleanup))
+
+      val metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(path).setConf(storageConf).build()
+      metaClient.reloadActiveTimeline()
+      
assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0,
+        "Expected at least one .clean instant on the timeline after 
compaction")
+    } finally {
+      spark.sql(s"drop table if exists $tableName")
+    }
+  }
+
   @Test
   def testDimensionMismatchOnWrite(): Unit = {
     // Schema declares VECTOR(8) but data has arrays of length 4
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala
new file mode 100644
index 000000000000..a1197fd7a89b
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.dml.schema
+
+import org.apache.hudi.blob.BlobTestHelpers
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.testutils.DataSourceTestUtils
+import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
+
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+import java.io.File
+
+class TestBlobDataType extends HoodieSparkSqlTestBase {
+
+  private val referenceStructType =
+    "struct<external_path:string, offset:bigint, length:bigint, 
managed:boolean>"
+
+  private def inlineBlobLiteral(hex: String): String =
+    s"""named_struct(
+       |  'type', 'INLINE',
+       |  'data', cast(X'$hex' as binary),
+       |  'reference', cast(null as $referenceStructType)
+       |)""".stripMargin
+
+  private def outOfLineBlobLiteral(externalPath: String, offset: Long, length: 
Long): String =
+    s"""named_struct(
+       |  'type', 'OUT_OF_LINE',
+       |  'data', cast(null as binary),
+       |  'reference', named_struct(
+       |    'external_path', '$externalPath',
+       |    'offset', cast($offset as bigint),
+       |    'length', cast($length as bigint),
+       |    'managed', false
+       |  )
+       |)""".stripMargin
+
+  test("Test Query Log Only MOR Table With BLOB INLINE column triggers 
compaction") {
+    withRecordType()(withTempDir { tmp =>
+      val tablePath = new File(tmp, "hudi").getCanonicalPath
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  data blob,
+           |  ts long
+           |) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  type = 'mor',
+           |  preCombineField = 'ts',
+           |  hoodie.index.type = 'INMEMORY',
+           |  hoodie.compact.inline = 'true',
+           |  hoodie.compact.inline.max.delta.commits = '5',
+           |  hoodie.clean.commits.retained = '1'
+           | )
+       """.stripMargin)
+
+      spark.sql(s"insert into $tableName values (1, 
${inlineBlobLiteral("01")}, 1000)")
+      spark.sql(s"insert into $tableName values (2, 
${inlineBlobLiteral("02")}, 1000)")
+      spark.sql(s"insert into $tableName values (3, 
${inlineBlobLiteral("03")}, 1000)")
+      // 3 commits will not trigger compaction, so it should be log only.
+      assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath))
+
+      spark.sql(
+        s"""
+           |merge into $tableName h0
+           |using (
+           |  select 1 as id, ${inlineBlobLiteral("11")} as data, 1001L as ts
+           |) s0
+           | on h0.id = s0.id
+           | when matched then update set *
+           |""".stripMargin)
+      // 4 commits will not trigger compaction, so it should be log only.
+      assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath))
+
+      spark.sql(
+        s"""
+           |merge into $tableName h0
+           |using (
+           |  select 4 as id, ${inlineBlobLiteral("04")} as data, 1000L as ts
+           |) s0
+           | on h0.id = s0.id
+           | when not matched then insert *
+           |""".stripMargin)
+
+      // 5 commits will trigger compaction.
+      assertResult(false)(DataSourceTestUtils.isLogFileOnly(tablePath))
+
+      // read_blob() on an INLINE column returns the inline bytes directly, 
verify the
+      // post-compaction bytes match what was written.
+      val bytesById = spark.sql(
+        s"select id, read_blob(data) as bytes from $tableName order by id"
+      ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap
+      assertResult(4)(bytesById.size)
+      assert(bytesById(1).sameElements(Array(0x11.toByte)))
+      assert(bytesById(2).sameElements(Array(0x02.toByte)))
+      assert(bytesById(3).sameElements(Array(0x03.toByte)))
+      assert(bytesById(4).sameElements(Array(0x04.toByte)))
+
+      // Verify inline shape: type='INLINE', data non-null, reference null.
+      spark.sql(s"select id, data from $tableName order by 
id").collect().foreach { row =>
+        val blob = row.getStruct(1)
+        
assertResult("INLINE")(blob.getString(blob.fieldIndex(HoodieSchema.Blob.TYPE)))
+        
assert(!blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)))
+        
assert(blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)))
+      }
+
+      // BLOB custom-type descriptor must survive the compacted base-file read 
path.
+      val blobField = spark.table(tableName).schema.find(_.name == "data").get
+      assert(blobField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD),
+        s"Expected BLOB type metadata on data field after compaction, " +
+          s"got: ${blobField.metadata}")
+      assertResult(HoodieSchemaType.BLOB.name())(
+        blobField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+
+      // 6th commit drives an auto-clean that retires the now-superseded 
log-only slice.
+      // Inline compaction on commit 5 ran AFTER its own postCommit clean, so 
the prior
+      // slice was not yet superseded when that clean fired and no .clean 
instant was
+      // written. This deltacommit's postCommit clean writes the .clean 
instant.
+      spark.sql(
+        s"""
+           |merge into $tableName h0
+           |using (
+           |  select 2 as id, ${inlineBlobLiteral("22")} as data, 1002L as ts
+           |) s0
+           | on h0.id = s0.id
+           | when matched then update set *
+           |""".stripMargin)
+      val updatedBytesById = spark.sql(
+        s"select id, read_blob(data) as bytes from $tableName order by id"
+      ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap
+      assert(updatedBytesById(2).sameElements(Array(0x22.toByte)))
+
+      val metaClient = createMetaClient(spark, tablePath)
+      metaClient.reloadActiveTimeline()
+      assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 
0,
+        "Expected at least one .clean instant on the timeline after 
compaction")
+    })
+  }
+
+  test("Test Query Log Only MOR Table With BLOB OUT_OF_LINE column triggers 
compaction") {
+    withRecordType()(withTempDir { tmp =>
+      val tablePath = new File(tmp, "hudi").getCanonicalPath
+      val blobDir = new File(tmp, "blobs")
+      blobDir.mkdirs()
+      // createTestFile writes bytes where byte[i] = i % 256, 
assertBytesContent
+      // checks round-trip against that pattern.
+      val file1 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob1.bin", 
100)
+      val file2 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob2.bin", 
100)
+      val file3 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob3.bin", 
100)
+      val file4 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob4.bin", 
100)
+      val file1Updated = BlobTestHelpers.createTestFile(blobDir.toPath, 
"blob1_updated.bin", 80)
+      val file2Updated = BlobTestHelpers.createTestFile(blobDir.toPath, 
"blob2_updated.bin", 60)
+
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  data blob,
+           |  ts long
+           |) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  type = 'mor',
+           |  preCombineField = 'ts',
+           |  hoodie.index.type = 'INMEMORY',
+           |  hoodie.compact.inline = 'true',
+           |  hoodie.compact.inline.max.delta.commits = '5',
+           |  hoodie.clean.commits.retained = '1'
+           | )
+       """.stripMargin)
+
+      spark.sql(
+        s"insert into $tableName values (1, ${outOfLineBlobLiteral(file1, 0L, 
100L)}, 1000)")
+      spark.sql(
+        s"insert into $tableName values (2, ${outOfLineBlobLiteral(file2, 0L, 
100L)}, 1000)")
+      spark.sql(
+        s"insert into $tableName values (3, ${outOfLineBlobLiteral(file3, 0L, 
100L)}, 1000)")
+      // 3 commits will not trigger compaction, so it should be log only.
+      assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath))
+
+      spark.sql(
+        s"""
+           |merge into $tableName h0
+           |using (
+           |  select 1 as id, ${outOfLineBlobLiteral(file1Updated, 0L, 80L)} 
as data, 1001L as ts
+           |) s0
+           | on h0.id = s0.id
+           | when matched then update set *
+           |""".stripMargin)
+      // 4 commits will not trigger compaction, so it should be log only.
+      assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath))
+
+      spark.sql(
+        s"""
+           |merge into $tableName h0
+           |using (
+           |  select 4 as id, ${outOfLineBlobLiteral(file4, 0L, 100L)} as 
data, 1000L as ts
+           |) s0
+           | on h0.id = s0.id
+           | when not matched then insert *
+           |""".stripMargin)
+
+      // 5 commits will trigger compaction.
+      assertResult(false)(DataSourceTestUtils.isLogFileOnly(tablePath))
+
+      // read_blob() on an OUT_OF_LINE column must dereference external_path 
and read
+      // the referenced byte range, verify bytes from the compacted base-file 
plan.
+      val bytesById = spark.sql(
+        s"select id, read_blob(data) as bytes from $tableName order by id"
+      ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap
+      assertResult(4)(bytesById.size)
+      assertResult(80)(bytesById(1).length)
+      BlobTestHelpers.assertBytesContent(bytesById(1))
+      assertResult(100)(bytesById(2).length)
+      BlobTestHelpers.assertBytesContent(bytesById(2))
+      assertResult(100)(bytesById(3).length)
+      BlobTestHelpers.assertBytesContent(bytesById(3))
+      assertResult(100)(bytesById(4).length)
+      BlobTestHelpers.assertBytesContent(bytesById(4))
+
+      // Verify out-of-line shape: type='OUT_OF_LINE', data null, reference 
non-null.
+      spark.sql(s"select id, data from $tableName order by 
id").collect().foreach { row =>
+        val blob = row.getStruct(1)
+        
assertResult("OUT_OF_LINE")(blob.getString(blob.fieldIndex(HoodieSchema.Blob.TYPE)))
+        
assert(blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)))
+        
assert(!blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)))
+      }
+
+      // BLOB custom-type descriptor must survive the compacted base-file read 
path.
+      val blobField = spark.table(tableName).schema.find(_.name == "data").get
+      assert(blobField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD),
+        s"Expected BLOB type metadata on data field after compaction, " +
+          s"got: ${blobField.metadata}")
+      assertResult(HoodieSchemaType.BLOB.name())(
+        blobField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+
+      // 6th commit drives an auto-clean that retires the now-superseded 
log-only slice.
+      // Inline compaction on commit 5 ran AFTER its own postCommit clean, so 
the prior
+      // slice was not yet superseded when that clean fired and no .clean 
instant was
+      // written. This deltacommit's postCommit clean writes the .clean 
instant.
+      spark.sql(
+        s"""
+           |merge into $tableName h0
+           |using (
+           |  select 2 as id, ${outOfLineBlobLiteral(file2Updated, 0L, 60L)} 
as data, 1002L as ts
+           |) s0
+           | on h0.id = s0.id
+           | when matched then update set *
+           |""".stripMargin)
+      val updatedBytesById = spark.sql(
+        s"select id, read_blob(data) as bytes from $tableName order by id"
+      ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap
+      assertResult(60)(updatedBytesById(2).length)
+      BlobTestHelpers.assertBytesContent(updatedBytesById(2))
+
+      val metaClient = createMetaClient(spark, tablePath)
+      metaClient.reloadActiveTimeline()
+      assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 
0,
+        "Expected at least one .clean instant on the timeline after 
compaction")
+    })
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
index 443a316abc8c..56bb3270b5a1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
@@ -24,6 +24,8 @@ import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.internal.schema.HoodieSchemaException
+import org.apache.hudi.testutils.DataSourceTestUtils
+import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 
 import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -128,6 +130,121 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Query Log Only MOR Table With VARIANT column triggers 
compaction") {
+    assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or 
higher")
+
+    withRecordType()(withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  v variant,
+           |  ts long
+           |) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  type = 'mor',
+           |  preCombineField = 'ts',
+           |  hoodie.index.type = 'INMEMORY',
+           |  hoodie.compact.inline = 'true',
+           |  hoodie.compact.inline.max.delta.commits = '5',
+           |  hoodie.clean.commits.retained = '1'
+           | )
+       """.stripMargin)
+
+      spark.sql(
+        s"insert into $tableName values " +
+          "(1, parse_json('{\"key\":\"value1\"}'), 1000)")
+      spark.sql(
+        s"insert into $tableName values " +
+          "(2, parse_json('{\"key\":\"value2\"}'), 1000)")
+      spark.sql(
+        s"insert into $tableName values " +
+          "(3, parse_json('{\"key\":\"value3\"}'), 1000)")
+      // 3 commits will not trigger compaction, so it should be log only.
+      assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath))
+      checkAnswer(s"select id, cast(v as string), ts from $tableName order by 
id")(
+        Seq(1, "{\"key\":\"value1\"}", 1000),
+        Seq(2, "{\"key\":\"value2\"}", 1000),
+        Seq(3, "{\"key\":\"value3\"}", 1000)
+      )
+
+      spark.sql(
+        s"""
+           |merge into $tableName h0
+           |using (
+           |  select 1 as id,
+           |         parse_json('{"key":"v1-merged"}') as v,
+           |         1001L as ts
+           |) s0
+           | on h0.id = s0.id
+           | when matched then update set *
+           |""".stripMargin)
+      // 4 commits will not trigger compaction, so it should be log only.
+      assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath))
+      checkAnswer(s"select id, cast(v as string), ts from $tableName order by 
id")(
+        Seq(1, "{\"key\":\"v1-merged\"}", 1001),
+        Seq(2, "{\"key\":\"value2\"}", 1000),
+        Seq(3, "{\"key\":\"value3\"}", 1000)
+      )
+
+      spark.sql(
+        s"""
+           |merge into $tableName h0
+           |using (
+           |  select 4 as id,
+           |         parse_json('{"key":"value4"}') as v,
+           |         1000L as ts
+           |) s0
+           | on h0.id = s0.id
+           | when not matched then insert *
+           |""".stripMargin)
+
+      // 5 commits will trigger compaction.
+      assertResult(false)(DataSourceTestUtils.isLogFileOnly(tablePath))
+      checkAnswer(s"select id, cast(v as string), ts from $tableName order by 
id")(
+        Seq(1, "{\"key\":\"v1-merged\"}", 1001),
+        Seq(2, "{\"key\":\"value2\"}", 1000),
+        Seq(3, "{\"key\":\"value3\"}", 1000),
+        Seq(4, "{\"key\":\"value4\"}", 1000)
+      )
+
+      // VARIANT must round-trip as native VariantType through the compacted 
base-file read path.
+      val variantField = spark.table(tableName).schema.find(_.name == "v").get
+      assertResult("variant")(variantField.dataType.typeName)
+
+      // 6th commit drives an auto-clean that retires the now-superseded 
log-only slice.
+      // Inline compaction on commit 5 ran AFTER its own postCommit clean, so 
the prior
+      // slice was not yet superseded when that clean fired and no .clean 
instant was
+      // written. This deltacommit's postCommit clean writes the .clean 
instant.
+      spark.sql(
+        s"""
+           |merge into $tableName h0
+           |using (
+           |  select 2 as id,
+           |         parse_json('{"key":"v2-merged"}') as v,
+           |         1002L as ts
+           |) s0
+           | on h0.id = s0.id
+           | when matched then update set *
+           |""".stripMargin)
+      checkAnswer(s"select id, cast(v as string), ts from $tableName order by 
id")(
+        Seq(1, "{\"key\":\"v1-merged\"}", 1001),
+        Seq(2, "{\"key\":\"v2-merged\"}", 1002),
+        Seq(3, "{\"key\":\"value3\"}", 1000),
+        Seq(4, "{\"key\":\"value4\"}", 1000)
+      )
+
+      val metaClient = createMetaClient(spark, tablePath)
+      metaClient.reloadActiveTimeline()
+      assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 
0,
+        "Expected at least one .clean instant on the timeline after 
compaction")
+    })
+  }
+
   test("Test toHiveCompatibleSchema converts VariantType to physical struct") {
     assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or 
higher")
 

Reply via email to