This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit f1114af22b52d663ad24f3fa5844464e65981be7 Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Sun Sep 10 20:13:56 2023 -0500 [HUDI-6835] Adjust spark sql core flow test scenarios (#9664) --- .../hudi/functional/TestSparkSqlCoreFlow.scala | 160 ++++++++++----------- 1 file changed, 76 insertions(+), 84 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala index 7510204bac4..220c6930c4f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala @@ -46,24 +46,22 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { //params for core flow tests val params: List[String] = List( - "COPY_ON_WRITE|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM" + "COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", + "COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", + "COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", + "COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", + "COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", + "COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", + "COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", + "COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", + "MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", + "MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", + "MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", + "MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", + "MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", + "MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE" ) //extracts the params and runs each core flow test @@ -73,16 +71,15 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { withTempDir { basePath => testCoreFlows(basePath, tableType = splits(0), - isMetadataEnabledOnWrite = splits(1).toBoolean, - isMetadataEnabledOnRead = splits(2).toBoolean, - keyGenClass = splits(3), - indexType = splits(4)) + isMetadataEnabled = splits(1).toBoolean, + keyGenClass = splits(2), + indexType = splits(3)) } } } - def testCoreFlows(basePath: File, tableType: String, isMetadataEnabledOnWrite: Boolean, - isMetadataEnabledOnRead: Boolean, keyGenClass: String, indexType: String): Unit = { + def testCoreFlows(basePath: File, tableType: String, isMetadataEnabled: Boolean, + keyGenClass: String, indexType: String): Unit = { //Create table and set up for testing val tableName = generateTableName val tableBasePath = basePath.getCanonicalPath + "/" + tableName @@ -93,30 +90,30 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { //Bulk insert first set of records val inputDf0 = generateInserts(dataGen, "000", 100).cache() - insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT, isMetadataEnabledOnWrite, 1) + insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT, isMetadataEnabled, 1) assertTrue(hasNewCommits(fs, tableBasePath, "000")) //Verify bulk insert works correctly - val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabledOnRead).cache() + val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled).cache() assertEquals(100, snapshotDf1.count()) compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1) snapshotDf1.unpersist(true) //Test updated records val updateDf = generateUniqueUpdates(dataGen, "001", 50).cache() - insertInto(tableName, tableBasePath, updateDf, UPSERT, isMetadataEnabledOnWrite, 2) + insertInto(tableName, tableBasePath, updateDf, UPSERT, isMetadataEnabled, 2) val commitInstantTime2 = latestCommit(fs, tableBasePath) - val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabledOnRead).cache() + val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache() assertEquals(100, snapshotDf2.count()) compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1) snapshotDf2.unpersist(true) val inputDf2 = generateUniqueUpdates(dataGen, "002", 60).cache() val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count() - insertInto(tableName, tableBasePath, inputDf2, UPSERT, isMetadataEnabledOnWrite,3) + insertInto(tableName, tableBasePath, inputDf2, UPSERT, isMetadataEnabled, 3) val commitInstantTime3 = latestCommit(fs, tableBasePath) assertEquals(3, listCommitsSince(fs, tableBasePath, "000").size()) - val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabledOnRead).cache() + val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabled).cache() assertEquals(100, snapshotDf3.count()) compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotDf3) snapshotDf3.unpersist(true) @@ -133,7 +130,7 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { assertEquals(firstCommit, countsPerCommit(0).get(0).toString) val inputDf3 = generateUniqueUpdates(dataGen, "003", 80).cache() - insertInto(tableName, tableBasePath, inputDf3, UPSERT, isMetadataEnabledOnWrite, 4) + insertInto(tableName, tableBasePath, inputDf3, UPSERT, isMetadataEnabled, 4) //another incremental query with commit2 and commit3 //HUDI-5266 @@ -158,23 +155,23 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { timeTravelDf.unpersist(true) if (tableType.equals("MERGE_ON_READ")) { - val readOptDf = doMORReadOptimizedQuery(isMetadataEnabledOnRead, tableBasePath) + val readOptDf = doMORReadOptimizedQuery(isMetadataEnabled, tableBasePath) compareEntireInputDfWithHudiDf(inputDf0, readOptDf) - val snapshotDf4 = doSnapshotRead(tableName, isMetadataEnabledOnRead) + val snapshotDf4 = doSnapshotRead(tableName, isMetadataEnabled) // trigger compaction and try out Read optimized query. val inputDf4 = generateUniqueUpdates(dataGen, "004", 40).cache //count is increased by 2 because inline compaction will add extra commit to the timeline - doInlineCompact(tableName, tableBasePath, inputDf4, UPSERT, isMetadataEnabledOnWrite, "3", 6) - val snapshotDf5 = doSnapshotRead(tableName, isMetadataEnabledOnRead) + doInlineCompact(tableName, tableBasePath, inputDf4, UPSERT, isMetadataEnabled, "3", 6) + val snapshotDf5 = doSnapshotRead(tableName, isMetadataEnabled) snapshotDf5.cache() compareUpdateDfWithHudiDf(inputDf4, snapshotDf5, snapshotDf4) inputDf4.unpersist(true) snapshotDf5.unpersist(true) // compaction is expected to have completed. both RO and RT are expected to return same results. - compareROAndRT(isMetadataEnabledOnRead, tableName, tableBasePath) + compareROAndRT(isMetadataEnabled, tableName, tableBasePath) } inputDf0.unpersist(true) @@ -371,42 +368,38 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { //params for immutable user flow val paramsForImmutable: List[String] = List( - "COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM" + "COPY_ON_WRITE|insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", + "COPY_ON_WRITE|insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", + "COPY_ON_WRITE|insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", + "COPY_ON_WRITE|insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", + "COPY_ON_WRITE|insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", + "COPY_ON_WRITE|insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", + "COPY_ON_WRITE|insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", + "COPY_ON_WRITE|insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", + "MERGE_ON_READ|insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", + "MERGE_ON_READ|insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", + "MERGE_ON_READ|insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", + "MERGE_ON_READ|insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", + "MERGE_ON_READ|insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", + "MERGE_ON_READ|insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", + "COPY_ON_WRITE|bulk_insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", + "COPY_ON_WRITE|bulk_insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", + "COPY_ON_WRITE|bulk_insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", + "COPY_ON_WRITE|bulk_insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", + "COPY_ON_WRITE|bulk_insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", + "COPY_ON_WRITE|bulk_insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", + "COPY_ON_WRITE|bulk_insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", + "COPY_ON_WRITE|bulk_insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", + "MERGE_ON_READ|bulk_insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|bulk_insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|bulk_insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", + "MERGE_ON_READ|bulk_insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", + "MERGE_ON_READ|bulk_insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", + "MERGE_ON_READ|bulk_insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", + "MERGE_ON_READ|bulk_insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", + "MERGE_ON_READ|bulk_insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE" ) //extracts the params and runs each immutable user flow test @@ -419,21 +412,20 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { } else if (splits(1).equals("bulk_insert")) { BULK_INSERT } else { - UPSERT + throw new UnsupportedOperationException("This test is only meant for immutable operations.") } testImmutableUserFlow(basePath, tableType = splits(0), writeOp = writeOp, - isMetadataEnabledOnWrite = splits(2).toBoolean, - isMetadataEnabledOnRead = splits(3).toBoolean, - keyGenClass = splits(4), - indexType = splits(5)) + isMetadataEnabled = splits(2).toBoolean, + keyGenClass = splits(3), + indexType = splits(4)) } } } def testImmutableUserFlow(basePath: File, tableType: String, writeOp: WriteOperationType, - isMetadataEnabledOnWrite: Boolean, isMetadataEnabledOnRead: Boolean, keyGenClass: String, + isMetadataEnabled: Boolean, keyGenClass: String, indexType: String): Unit = { val tableName = generateTableName val tableBasePath = basePath.getCanonicalPath + "/" + tableName @@ -444,31 +436,31 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { //Insert Operation val dataGen = new HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 0xDEED) val inputDf0 = generateInserts(dataGen, "000", 100).cache - insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT, isMetadataEnabledOnWrite, 1) + insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT, isMetadataEnabled, 1) assertTrue(hasNewCommits(fs, tableBasePath, "000")) //Snapshot query - val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabledOnRead) + val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled) assertEquals(100, snapshotDf1.count()) compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1) val inputDf1 = generateInserts(dataGen, "001", 50).cache - insertInto(tableName, tableBasePath, inputDf1, writeOp, isMetadataEnabledOnWrite, 2) + insertInto(tableName, tableBasePath, inputDf1, writeOp, isMetadataEnabled, 2) - val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabledOnRead).cache + val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache assertEquals(150, snapshotDf2.count()) compareEntireInputDfWithHudiDf(inputDf1.union(inputDf0), snapshotDf2) snapshotDf2.unpersist(true) val inputDf2 = generateInserts(dataGen, "002", 60).cache() - insertInto(tableName, tableBasePath, inputDf2, writeOp, isMetadataEnabledOnWrite, 3) + insertInto(tableName, tableBasePath, inputDf2, writeOp, isMetadataEnabled, 3) assertEquals(3, listCommitsSince(fs, tableBasePath, "000").size()) // Snapshot Query - val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabledOnRead).cache + val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabled).cache assertEquals(210, snapshotDf3.count()) compareEntireInputDfWithHudiDf(inputDf1.union(inputDf0).union(inputDf2), snapshotDf3) snapshotDf3.unpersist(true)