voonhous commented on code in PR #18583: URL: https://github.com/apache/hudi/pull/18583#discussion_r3200918529
########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala: ########## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.dml.schema + +import org.apache.hudi.blob.BlobTestHelpers +import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} +import org.apache.hudi.testutils.DataSourceTestUtils +import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient + +import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase + +import java.io.File + +class TestBlobDataType extends HoodieSparkSqlTestBase { + + private val referenceStructType = + "struct<external_path:string, offset:bigint, length:bigint, managed:boolean>" + + private def inlineBlobLiteral(hex: String): String = + s"""named_struct( + | 'type', 'INLINE', + | 'data', cast(X'$hex' as binary), + | 'reference', cast(null as $referenceStructType) + |)""".stripMargin + + private def outOfLineBlobLiteral(path: String, offset: Long, length: Long): String = + s"""named_struct( + | 'type', 'OUT_OF_LINE', + | 'data', cast(null as binary), + | 'reference', named_struct( + | 'external_path', '$path', + | 'offset', cast($offset as bigint), + | 'length', cast($length as bigint), + | 'managed', false + | ) + |)""".stripMargin + + test("Test Query Log Only MOR Table With BLOB INLINE column triggers compaction") { + withRecordType()(withTempDir { tmp => + val tablePath = new File(tmp, "hudi").getCanonicalPath + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | data blob, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1' + | ) + """.stripMargin) + + spark.sql(s"insert into $tableName values (1, ${inlineBlobLiteral("01")}, 1000)") + spark.sql(s"insert into $tableName values (2, ${inlineBlobLiteral("02")}, 1000)") + spark.sql(s"insert into $tableName values (3, ${inlineBlobLiteral("03")}, 1000)") + // 3 commits will not trigger compaction, so it should be log only. + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, ${inlineBlobLiteral("11")} as data, 1001L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + // 4 commits will not trigger compaction, so it should be log only. + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, ${inlineBlobLiteral("04")} as data, 1000L as ts + |) s0 + | on h0.id = s0.id + | when not matched then insert * + |""".stripMargin) + + // 5 commits will trigger compaction. + assertResult(false)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + // read_blob() on an INLINE column returns the inline bytes directly, verify the + // post-compaction bytes match what was written. + val bytesById = spark.sql( + s"select id, read_blob(data) as bytes from $tableName order by id" + ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assertResult(4)(bytesById.size) + assert(bytesById(1).sameElements(Array(0x11.toByte))) + assert(bytesById(2).sameElements(Array(0x02.toByte))) + assert(bytesById(3).sameElements(Array(0x03.toByte))) + assert(bytesById(4).sameElements(Array(0x04.toByte))) + + // Verify inline shape: type='INLINE', data non-null, reference null. + spark.sql(s"select id, data from $tableName order by id").collect().foreach { row => + val blob = row.getStruct(1) + assertResult("INLINE")(blob.getString(blob.fieldIndex(HoodieSchema.Blob.TYPE))) + assert(!blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD))) + assert(blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE))) + } + + // BLOB custom-type descriptor must survive the compacted base-file read path. + val blobField = spark.table(tableName).schema.find(_.name == "data").get + assert(blobField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD), + s"Expected BLOB type metadata on data field after compaction, " + + s"got: ${blobField.metadata}") + assertResult(HoodieSchemaType.BLOB.name())( + blobField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) + + // 6th commit drives an auto-clean that retires the now-superseded log-only slice. + // Inline compaction on commit 5 ran AFTER its own postCommit clean, so the prior + // slice was not yet superseded when that clean fired and no .clean instant was + // written. This deltacommit's postCommit clean writes the .clean instant. + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 2 as id, ${inlineBlobLiteral("22")} as data, 1002L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + val updatedBytesById = spark.sql( + s"select id, read_blob(data) as bytes from $tableName order by id" + ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assert(updatedBytesById(2).sameElements(Array(0x22.toByte))) + + val metaClient = createMetaClient(spark, tablePath) + metaClient.reloadActiveTimeline() + assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0, + "Expected at least one .clean instant on the timeline after compaction") + }) + } + + test("Test Query Log Only MOR Table With BLOB OUT_OF_LINE column triggers compaction") { + withRecordType()(withTempDir { tmp => Review Comment: Skipping this one. The two tests diverge on the literal builder, the byte assertions, and the post compaction shape check (data null vs reference null), so a shared helper would need 3 callbacks for 2 callers. The duplication is honest. ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala: ########## @@ -674,6 +675,129 @@ class TestVectorDataSource extends HoodieSparkClientTestBase { assertTrue(r7.getSeq[Double](1).forall(_ == 1.0), "key_7 should have original value 1.0") } + @Test + def testMorLogOnlyCompactionPreservesVectorMetadata(): Unit = { + val path = basePath + "/mor_log_only_vec" + val tableName = "mor_log_only_vec_test" + try { + spark.sql( + s""" + |create table $tableName ( + | id int, + | embedding VECTOR(3), + | ts long + |) using hudi + | location '$path' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1' + | ) + """.stripMargin) + + def readOrdered(): Seq[Row] = + spark.sql(s"select id, embedding, ts from $tableName order by id").collect().toSeq + + def embeddingOf(id: Int, rows: Seq[Row]): Seq[Float] = + rows.find(_.getInt(0) == id).get.getSeq[Float](1) + + spark.sql( + s"insert into $tableName values " + + "(1, array(cast(0.1 as float), cast(0.2 as float), cast(0.3 as float)), 1000)") Review Comment: Yes, keeping it. A bare SQL literal like 0.1 parses as decimal(1,1) in Spark, but VECTOR(3) is backed by array<float>. The cast avoids relying on implicit decimal to float coercion which is stricter on older Spark versions. Other tests in this file build vectors from Scala Row(Seq(0.1f, ...)) so they do not hit this path. ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala: ########## @@ -674,6 +675,129 @@ class TestVectorDataSource extends HoodieSparkClientTestBase { assertTrue(r7.getSeq[Double](1).forall(_ == 1.0), "key_7 should have original value 1.0") } + @Test + def testMorLogOnlyCompactionPreservesVectorMetadata(): Unit = { + val path = basePath + "/mor_log_only_vec" + val tableName = "mor_log_only_vec_test" + try { + spark.sql( + s""" + |create table $tableName ( + | id int, + | embedding VECTOR(3), + | ts long + |) using hudi + | location '$path' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1' + | ) + """.stripMargin) + + def readOrdered(): Seq[Row] = + spark.sql(s"select id, embedding, ts from $tableName order by id").collect().toSeq + + def embeddingOf(id: Int, rows: Seq[Row]): Seq[Float] = + rows.find(_.getInt(0) == id).get.getSeq[Float](1) Review Comment: Done, switched to getOrElse(fail(...)). ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala: ########## @@ -128,6 +130,119 @@ class TestVariantDataType extends HoodieSparkSqlTestBase { } } + test("Test Query Log Only MOR Table With VARIANT column triggers compaction") { + assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or higher") + + withRecordType()(withTempDir { tmp => Review Comment: Done, extracted val tablePath. ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala: ########## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.dml.schema + +import org.apache.hudi.blob.BlobTestHelpers +import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} +import org.apache.hudi.testutils.DataSourceTestUtils +import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient + +import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase + +import java.io.File + +class TestBlobDataType extends HoodieSparkSqlTestBase { + + private val referenceStructType = + "struct<external_path:string, offset:bigint, length:bigint, managed:boolean>" + + private def inlineBlobLiteral(hex: String): String = + s"""named_struct( + | 'type', 'INLINE', + | 'data', cast(X'$hex' as binary), + | 'reference', cast(null as $referenceStructType) + |)""".stripMargin + + private def outOfLineBlobLiteral(path: String, offset: Long, length: Long): String = + s"""named_struct( + | 'type', 'OUT_OF_LINE', + | 'data', cast(null as binary), + | 'reference', named_struct( + | 'external_path', '$path', + | 'offset', cast($offset as bigint), + | 'length', cast($length as bigint), + | 'managed', false + | ) + |)""".stripMargin Review Comment: Skipping. Paths come from the JUnit tmp dir and BlobTestHelpers.createTestFile, so they will not contain single quotes in practice. Not worth a helper for a test. ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala: ########## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.dml.schema + +import org.apache.hudi.blob.BlobTestHelpers +import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} +import org.apache.hudi.testutils.DataSourceTestUtils +import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient + +import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase + +import java.io.File + +class TestBlobDataType extends HoodieSparkSqlTestBase { + + private val referenceStructType = + "struct<external_path:string, offset:bigint, length:bigint, managed:boolean>" + + private def inlineBlobLiteral(hex: String): String = + s"""named_struct( + | 'type', 'INLINE', + | 'data', cast(X'$hex' as binary), + | 'reference', cast(null as $referenceStructType) + |)""".stripMargin + + private def outOfLineBlobLiteral(path: String, offset: Long, length: Long): String = + s"""named_struct( + | 'type', 'OUT_OF_LINE', + | 'data', cast(null as binary), + | 'reference', named_struct( + | 'external_path', '$path', + | 'offset', cast($offset as bigint), + | 'length', cast($length as bigint), + | 'managed', false + | ) + |)""".stripMargin + + test("Test Query Log Only MOR Table With BLOB INLINE column triggers compaction") { + withRecordType()(withTempDir { tmp => + val tablePath = new File(tmp, "hudi").getCanonicalPath + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | data blob, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1' + | ) Review Comment: Pinned hoodie.compact.inline.max.delta.commits = '5' on both tables. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
