This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch HUDI-8990-Part3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 44911bca9b6433fa573adeb441a1a117858d671f Author: YueZhang <[email protected]> AuthorDate: Tue Apr 8 15:51:47 2025 +0800 finish dry-run && rollback-command && show-config-command && add-expression-command --- .../model/PartitionBucketIndexHashingConfig.java | 48 +++-- .../procedures/PartitionBucketIndexManager.scala | 204 ++++++++++++--------- .../TestInsertTableWithPartitionBucketIndex.scala | 154 ++++++++++++++-- 3 files changed, 289 insertions(+), 117 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java index 4467607ef8b..10f8e249161 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java @@ -198,21 +198,44 @@ public class PartitionBucketIndexHashingConfig implements Serializable { * Get Latest committed hashing config instant to load. * If instant is empty, then return latest hashing config instant */ - public static Option<String> getHashingConfigInstantToLoad(HoodieTableMetaClient metaClient, Option<String> instant) { + public static Option<StoragePath> getHashingConfigToLoad(HoodieTableMetaClient metaClient, Option<String> instant) { try { + String basePath = metaClient.getBasePath().toString(); List<String> allCommittedHashingConfig = getCommittedHashingConfigInstants(metaClient); if (instant.isPresent()) { - Option<String> res = getHashingConfigInstantToLoadBeforeOrOn(allCommittedHashingConfig, instant.get()); + Option<StoragePath> res = getHashingConfigInstantToLoadBeforeOrOn(allCommittedHashingConfig, instant.get()).map(i -> { + return getHashingConfigPath(basePath, i); + }); // fall back to look up archived hashing config instant before return empty - return res.isPresent() ? res : getHashingConfigInstantToLoadBeforeOrOn(getArchiveHashingConfigInstants(metaClient), instant.get()); + return res.isPresent() ? res : getHashingConfigInstantToLoadBeforeOrOn(getArchiveHashingConfigInstants(metaClient), instant.get()).map(i -> { + return getArchiveHashingConfigPath(basePath, i); + }); } else { - return Option.of(allCommittedHashingConfig.get(allCommittedHashingConfig.size() - 1)); + return Option.of(allCommittedHashingConfig.get(allCommittedHashingConfig.size() - 1)).map(i -> { + return getHashingConfigPath(basePath, i); + }); } } catch (Exception e) { throw new HoodieException("Failed to get hashing config instant to load.", e); } } + public static List<PartitionBucketIndexHashingConfig> getAllHashingConfig(HoodieTableMetaClient metaClient) throws IOException { + String basePath = metaClient.getBasePath().toString(); + List<StoragePath> allHashingConfig = getCommittedHashingConfigInstants(metaClient).stream().map(instant -> { + return getHashingConfigPath(basePath, instant); + }).collect(Collectors.toList()); + + if (metaClient.getStorage().exists(new StoragePath(metaClient.getArchiveHashingMetadataConfigPath()))) { + allHashingConfig.addAll(getArchiveHashingConfigInstants(metaClient).stream().map(instant -> { + return getArchiveHashingConfigPath(basePath, instant); + }).collect(Collectors.toList())); + } + return allHashingConfig.stream().map(hashingConfigPath -> { + return loadHashingConfig(metaClient.getStorage(), hashingConfigPath); + }).filter(Option::isPresent).map(Option::get).collect(Collectors.toList()); + } + private static Option<String> getHashingConfigInstantToLoadBeforeOrOn(List<String> hashingConfigInstants, String instant) { List<String> res = hashingConfigInstants.stream().filter(hashingConfigInstant -> { return hashingConfigInstant.compareTo(instant) <= 0; @@ -221,20 +244,19 @@ public class PartitionBucketIndexHashingConfig implements Serializable { } public static PartitionBucketIndexHashingConfig loadingLatestHashingConfig(HoodieTableMetaClient metaClient) { - Option<String> instantToLoad = getHashingConfigInstantToLoad(metaClient, Option.empty()); - ValidationUtils.checkArgument(instantToLoad.isPresent(), "Can not load latest hashing config " + instantToLoad); - Option<PartitionBucketIndexHashingConfig> latestHashingConfig = loadHashingConfig(metaClient.getStorage(), getHashingConfigPath(metaClient.getBasePath().toString(), instantToLoad.get())); - ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not load latest hashing config " + instantToLoad); + Option<StoragePath> hashingConfigToLoad = getHashingConfigToLoad(metaClient, Option.empty()); + ValidationUtils.checkArgument(hashingConfigToLoad.isPresent(), "Can not load latest hashing config " + hashingConfigToLoad); + Option<PartitionBucketIndexHashingConfig> latestHashingConfig = loadHashingConfig(metaClient.getStorage(), hashingConfigToLoad.get()); + ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not load latest hashing config " + hashingConfigToLoad); return latestHashingConfig.get(); } public static Option<PartitionBucketIndexHashingConfig> loadingLatestHashingConfigBeforeOrOn(HoodieTableMetaClient metaClient, String instant) { - Option<String> hashingConfigInstantToLoad = getHashingConfigInstantToLoad(metaClient, Option.of(instant)); - if (hashingConfigInstantToLoad.isPresent()) { - Option<PartitionBucketIndexHashingConfig> latestHashingConfig = loadHashingConfig(metaClient.getStorage(), - getHashingConfigPath(metaClient.getBasePath().toString(), hashingConfigInstantToLoad.get())); - ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not load hashing config " + hashingConfigInstantToLoad + " based on " + instant); + Option<StoragePath> hashingConfigToLoad = getHashingConfigToLoad(metaClient, Option.of(instant)); + if (hashingConfigToLoad.isPresent()) { + Option<PartitionBucketIndexHashingConfig> latestHashingConfig = loadHashingConfig(metaClient.getStorage(), hashingConfigToLoad.get()); + ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not load hashing config " + hashingConfigToLoad + " based on " + instant); return latestHashingConfig; } else { return Option.empty(); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala index f6b546cc4ab..927dbf85aa9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command.procedures import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieSparkSqlWriter} import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL, ENABLE_ROW_WRITER, OPERATION} import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig, SerializableSchema} import org.apache.hudi.common.engine.HoodieEngineContext import org.apache.hudi.common.fs.FSUtils @@ -29,6 +30,7 @@ import org.apache.hudi.common.table.read.HoodieFileGroupReader import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.util.{Option, ValidationUtils} import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig} +import org.apache.hudi.config.HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE import org.apache.hudi.exception.HoodieException import org.apache.hudi.index.bucket.partition.{PartitionBucketIndexCalculator, PartitionBucketIndexUtils} import org.apache.hudi.internal.schema.InternalSchema @@ -61,9 +63,9 @@ class PartitionBucketIndexManager extends BaseProcedure ProcedureParameter.optional(1, "overwrite", DataTypes.StringType), ProcedureParameter.optional(2, "bucketNumber", DataTypes.IntegerType, -1), ProcedureParameter.optional(3, "add", DataTypes.StringType), - ProcedureParameter.optional(4, "dry-run", DataTypes.BooleanType, true), + ProcedureParameter.optional(4, "dryRun", DataTypes.BooleanType, true), ProcedureParameter.optional(5, "rollback", DataTypes.StringType), - ProcedureParameter.optional(6, "show-config", DataTypes.BooleanType, false), + ProcedureParameter.optional(6, "showConfig", DataTypes.BooleanType, false), ProcedureParameter.optional(7, "rule", DataTypes.StringType, "regex"), // params => key=value, key2=value2 ProcedureParameter.optional(8, "options", DataTypes.StringType) @@ -117,11 +119,11 @@ class PartitionBucketIndexManager extends BaseProcedure if (showConfig) { handleShowConfig(metaClient) } else if (rollback != null) { - handleRollback(metaClient, rollback) + handleRollback(writeClient, metaClient, rollback) } else if (overwrite != null) { handleOverwrite(config, context, metaClient, overwrite, bucketNumber, rule, dryRun) } else if (add != null) { - handleAdd(metaClient, add, dryRun) + handleAdd(config, context, metaClient, add, dryRun) } else { Seq(Row("ERROR", "INVALID_OPERATION", "No valid operation specified")) } @@ -179,76 +181,88 @@ class PartitionBucketIndexManager extends BaseProcedure // get partitions need to be rescaled val rescalePartitionsMap = getDifferentPartitions(partition2BucketWithNewHashingConfig.asScala, partition2BucketWithLatestHashingConfig.asScala) - val partitionsToRescale = rescalePartitionsMap.keys - - // get all fileSlices need to read - val allFilesMap = FSUtils.getFilesInPartitions(context, metaClient.getStorage(), HoodieMetadataConfig.newBuilder.enable(mdtEnable).build, - metaClient.getBasePath.toString, partitionsToRescale.map(relative => { - new StoragePath(basePath, relative) - }).map(storagePath => storagePath.toString).toArray) - val files = allFilesMap.values().asScala.flatMap(x => x.asScala).toList - val view = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline, files.asJava) - val allFileSlice = partitionsToRescale.flatMap(partitionPath => { - view.getLatestFileSlices(partitionPath).iterator().asScala - }).toList - - // read all fileSlice para and get DF - var tableSchemaWithMetaFields: Schema = null - try tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(new TableSchemaResolver(metaClient).getTableAvroSchema(false), false) - catch { - case e: Exception => - throw new HoodieException("Failed to get table schema during clustering", e) - } + if (dryRun) { + logInfo("Dry run OVERWRITE") + val rows = rescalePartitionsMap.map(entry => { + val details = + s""" + |${entry._1} => ${entry._2} + |""".stripMargin + details + }).toSeq + Seq(Row("SUCCESS", "DRY_RUN_OVERWRITE", s"""DETAILS:[$rows]""")) + } else { + logInfo("Perform OVERWRITE with dry-run disabled.") + val partitionsToRescale = rescalePartitionsMap.keys + // get all fileSlices need to read + val allFilesMap = FSUtils.getFilesInPartitions(context, metaClient.getStorage(), HoodieMetadataConfig.newBuilder.enable(mdtEnable).build, + metaClient.getBasePath.toString, partitionsToRescale.map(relative => { + new StoragePath(basePath, relative) + }).map(storagePath => storagePath.toString).toArray) + val files = allFilesMap.values().asScala.flatMap(x => x.asScala).toList + val view = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline, files.asJava) + val allFileSlice = partitionsToRescale.flatMap(partitionPath => { + view.getLatestFileSlices(partitionPath).iterator().asScala + }).toList + + // read all fileSlice para and get DF + var tableSchemaWithMetaFields: Schema = null + try tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(new TableSchemaResolver(metaClient).getTableAvroSchema(false), false) + catch { + case e: Exception => + throw new HoodieException("Failed to get table schema during clustering", e) + } - // broadcast reader context. - val broadcastManager = new SparkBroadcastManager(context, metaClient) - broadcastManager.prepareAndBroadcast() - val sparkSchemaWithMetaFields = AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields) + // broadcast reader context. + val broadcastManager = new SparkBroadcastManager(context, metaClient) + broadcastManager.prepareAndBroadcast() + val sparkSchemaWithMetaFields = AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields) - val res: RDD[InternalRow] = if (allFileSlice.isEmpty) { - spark.sparkContext.emptyRDD - } else { - val serializableTableSchemaWithMetaFields = new SerializableSchema(tableSchemaWithMetaFields) - val latestInstantTime = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get() - - spark.sparkContext.parallelize(allFileSlice, allFileSlice.size).flatMap(fileSlice => { - // instantiate other supporting cast - val readerSchema = serializableTableSchemaWithMetaFields.get - val readerContextOpt = broadcastManager.retrieveFileGroupReaderContext(basePath) - val internalSchemaOption: Option[InternalSchema] = Option.empty() - // instantiate FG reader - val fileGroupReader = new HoodieFileGroupReader(readerContextOpt.get(), - metaClient.getStorage, - basePath.toString, - latestInstantTime.requestedTime(), - fileSlice, - readerSchema, - readerSchema, - internalSchemaOption, // not support evolution of schema for now - metaClient, - metaClient.getTableConfig.getProps, - 0, - java.lang.Long.MAX_VALUE, - HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.defaultValue(), - false) - fileGroupReader.initRecordIterators() - val iterator = fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]] - iterator.asScala - }) - } - val dataFrame = HoodieUnsafeUtils.createDataFrameFromRDD(sparkSession, res, sparkSchemaWithMetaFields) - logInfo("Start to do bucket rescale for " + rescalePartitionsMap) - val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write( - sparkSession.sqlContext, - SaveMode.Append, - finalConfig, - dataFrame) + val res: RDD[InternalRow] = if (allFileSlice.isEmpty) { + spark.sparkContext.emptyRDD + } else { + val serializableTableSchemaWithMetaFields = new SerializableSchema(tableSchemaWithMetaFields) + val latestInstantTime = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get() + + spark.sparkContext.parallelize(allFileSlice, allFileSlice.size).flatMap(fileSlice => { + // instantiate other supporting cast + val readerSchema = serializableTableSchemaWithMetaFields.get + val readerContextOpt = broadcastManager.retrieveFileGroupReaderContext(basePath) + val internalSchemaOption: Option[InternalSchema] = Option.empty() + // instantiate FG reader + val fileGroupReader = new HoodieFileGroupReader(readerContextOpt.get(), + metaClient.getStorage, + basePath.toString, + latestInstantTime.requestedTime(), + fileSlice, + readerSchema, + readerSchema, + internalSchemaOption, // not support evolution of schema for now + metaClient, + metaClient.getTableConfig.getProps, + 0, + java.lang.Long.MAX_VALUE, + HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.defaultValue(), + false) + fileGroupReader.initRecordIterators() + val iterator = fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]] + iterator.asScala + }) + } + val dataFrame = HoodieUnsafeUtils.createDataFrameFromRDD(sparkSession, res, sparkSchemaWithMetaFields) + logInfo("Start to do bucket rescale for " + rescalePartitionsMap) + val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write( + sparkSession.sqlContext, + SaveMode.Append, + finalConfig, + dataFrame) - val details = s"Expression: $expression, Bucket Number: $bucketNumber, Dry Run: $dryRun" + val details = s"Expression: $expression, Bucket Number: $bucketNumber, Dry Run: $dryRun" - val archived = PartitionBucketIndexHashingConfig.archiveHashingConfigIfNecessary(metaClient) + val archived = PartitionBucketIndexHashingConfig.archiveHashingConfigIfNecessary(metaClient) - Seq(Row("SUCCESS", "OVERWRITE", details)) + Seq(Row("SUCCESS", "OVERWRITE", details)) + } } /** @@ -270,39 +284,47 @@ class PartitionBucketIndexManager extends BaseProcedure /** * Handle the add operation. */ - private def handleAdd(metaClient: HoodieTableMetaClient, expression: String, dryRun: Boolean): Seq[Row] = { - // In a real implementation, this would call PartitionBucketIndexManager - // For now, just return a placeholder result - val details = s"Expression: $expression, Dry Run: $dryRun" - - // Here would be the actual call to PartitionBucketIndexManager - // PartitionBucketIndexManager.addExpression(metaClient, expression, dryRun) - - Seq(Row("SUCCESS", "ADD", details)) + private def handleAdd(config: Map[String, String], + context: HoodieEngineContext, + metaClient: HoodieTableMetaClient, + expression: String, + dryRun: Boolean): Seq[Row] = { + logInfo("Handle Add Expression Operation") + + val hashingConfig = PartitionBucketIndexHashingConfig.loadingLatestHashingConfig(metaClient) + val latestExpression = hashingConfig.getExpressions + + handleOverwrite(config, context, metaClient, s"""$expression;$latestExpression""", hashingConfig.getDefaultBucketNumber, + hashingConfig.getRule, dryRun) } /** * Handle the rollback operation. */ - private def handleRollback(metaClient: HoodieTableMetaClient, instantTime: String): Seq[Row] = { - // In a real implementation, this would call PartitionBucketIndexManager - // For now, just return a placeholder result - val details = s"Rolled back bucket rescale action: $instantTime" - - // Here would be the actual call to PartitionBucketIndexManager - // PartitionBucketIndexManager.rollback(metaClient, instantTime) - - Seq(Row("SUCCESS", "ROLLBACK", details)) + private def handleRollback(writeClient: SparkRDDWriteClient[_], metaClient: HoodieTableMetaClient, instantTime: String): Seq[Row] = { + logInfo("Handle Add Expression Operation") + val hashingConfig = PartitionBucketIndexHashingConfig.loadHashingConfig(metaClient.getStorage, PartitionBucketIndexHashingConfig.getHashingConfigPath(metaClient.getBasePath.toString, instantTime)) + if (hashingConfig.isPresent) { + logInfo("Start to rollback " + instantTime) + writeClient.getConfig.setValue(ROLLBACK_USING_MARKERS_ENABLE, "false") + writeClient.getConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS, hashingConfig.get().getExpressions) + val result = writeClient.rollback(instantTime) + Seq(Row("SUCCESS", "ROLLBACK", s"""$result to rollback $instantTime""")) + } else { + Seq(Row("FAILED", "ROLLBACK", null)) + } } /** * Handle the show-config operation. */ private def handleShowConfig(metaClient: HoodieTableMetaClient): Seq[Row] = { - - - - Seq(Row("SUCCESS", "SHOW_CONFIG", null)) + logInfo("Handle showConfig operation") + val hashingConfigs = PartitionBucketIndexHashingConfig.getAllHashingConfig(metaClient) + val res = hashingConfigs.asScala.map(config => { + config.toString + }).mkString("\\n") + Seq(Row("SUCCESS", "SHOW_CONFIG", res)) } override def build: Procedure = new PartitionBucketIndexManager diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala index bca3d044617..6ae1a9c752f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala @@ -19,12 +19,14 @@ package org.apache.spark.sql.hudi.dml import org.apache.hudi.common.model.{HoodieFailedWritesCleaningPolicy, PartitionBucketIndexHashingConfig} import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils +import org.apache.hudi.storage.StoragePath import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { @@ -91,7 +93,7 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-07),2" val rule = "regex" val defaultBucketNumber = 1 - spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber)") + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber, dryRun => false)") checkAnswer(s"select id, name, price, ts, dt from $tableName")( Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), @@ -132,6 +134,64 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { } } + test("Test Bucket Rescale Dry Run") { + withSQLConf( + "hoodie.datasource.write.operation" -> "bulk_insert", + "hoodie.bulkinsert.shuffle.parallelism" -> "2") { + withTempDir { tmp => + withTable(generateTableName) { tableName => + val tablePath = s"""${tmp.getCanonicalPath}/$tableName""" + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | dt string, + | name string, + | price double, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id,name', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'BUCKET', + | hoodie.bucket.index.hash.field = 'id,name', + | hoodie.bucket.index.num.buckets = 1, + | hoodie.datasource.write.row.writer.enable = 'true') + | partitioned by (dt) + | location '${tablePath}' + | """.stripMargin) + + // Note: Do not write the field alias, the partition field must be placed last. + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1,1', 10, 1000, "2021-01-05"), + | (11, 'a1,1', 10, 1000, "2021-01-05"), + | (2, 'a2', 20, 2000, "2021-01-06"), + | (22, 'a2', 20, 2000, "2021-01-06") + | """.stripMargin) + + // upgrade to partition level bucket index and rescale dt=2021-01-05 from 1 to 2 + val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-08),2" + val rule = "regex" + val defaultBucketNumber = 1 + val sql = s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber)" + val resExpect = ArrayBuffer( + s""" + |dt=2021-01-05 => 2 + |""".stripMargin) + + checkAnswer(sql)( + Seq("SUCCESS", "DRY_RUN_OVERWRITE", s"""DETAILS:[$resExpect]""") + ) + + val metaClient = createMetaClient(spark, tablePath) + // check there is no active hashing config + assert(!metaClient.getStorage.exists(new StoragePath(metaClient.getHashingMetadataConfigPath))) + }}}} + test("Test Bulk Insert Into Partition Bucket Index Table Without Rescale") { withSQLConf( "hoodie.datasource.write.operation" -> "bulk_insert", @@ -176,7 +236,7 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { val expressions = "dt=2021\\-01\\-07,2" val rule = "regex" val defaultBucketNumber = 1 - spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber)") + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber, dryRun => false)") checkAnswer(s"select id, name, price, ts, dt from $tableName")( Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), @@ -270,7 +330,7 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-07),2" val rule = "regex" val defaultBucketNumber = 1 - spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber)") + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber, dryRun => false)") checkAnswer(s"select id, name, price, ts, dt from $tableName")( Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), @@ -356,7 +416,7 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { // do bucket rescale commit 2 val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-07),4" val rule = "regex" - spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber)") + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber, dryRun => false)") // do commit 3 update id = 1111 spark.sql( @@ -368,7 +428,7 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { // do bucket rescale commit 4 val expressions2 = "dt=(2021\\-01\\-05|2021\\-01\\-07),3" val rule2 = "regex" - spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions2', rule => '$rule2', bucketNumber => $defaultBucketNumber)") + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions2', rule => '$rule2', bucketNumber => $defaultBucketNumber, dryRun => false)") // do commit 5 update id = 1111 spark.sql( @@ -456,10 +516,10 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-07),2" val rule = "regex" val defaultBucketNumber = 1 - spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber)") + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber, dryRun => false)") val expressions2 = "dt=(2021\\-01\\-05|2021\\-01\\-07),3" - spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions2', rule => '$rule', bucketNumber => $defaultBucketNumber)") + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions2', rule => '$rule', bucketNumber => $defaultBucketNumber, dryRun => false)") // delete latest replace commit val replaceCommit = metaClient.getActiveTimeline.getCompletedReplaceTimeline.lastInstant().get() @@ -547,25 +607,93 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-07),2" val rule = "regex" val defaultBucketNumber = 1 - spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber)") + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber, dryRun => false)") val expressions2 = "dt=(2021\\-01\\-05|2021\\-01\\-07),3" - spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions2', rule => '$rule', bucketNumber => $defaultBucketNumber)") + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions2', rule => '$rule', bucketNumber => $defaultBucketNumber, dryRun => false)") val expressions3 = "dt=(2021\\-01\\-05|2021\\-01\\-07),4" - spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions3', rule => '$rule', bucketNumber => $defaultBucketNumber)") + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions3', rule => '$rule', bucketNumber => $defaultBucketNumber, dryRun => false)") val expressions4 = "dt=(2021\\-01\\-05|2021\\-01\\-07),5" - spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions4', rule => '$rule', bucketNumber => $defaultBucketNumber)") + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions4', rule => '$rule', bucketNumber => $defaultBucketNumber, dryRun => false)") val expressions5 = "dt=(2021\\-01\\-05|2021\\-01\\-07),6" - spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions5', rule => '$rule', bucketNumber => $defaultBucketNumber)") + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions5', rule => '$rule', bucketNumber => $defaultBucketNumber, dryRun => false)") val expressions6 = "dt=(2021\\-01\\-05|2021\\-01\\-07),6" - spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions6', rule => '$rule', bucketNumber => $defaultBucketNumber)") + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions6', rule => '$rule', bucketNumber => $defaultBucketNumber, dryRun => false)") assert(PartitionBucketIndexHashingConfig.getArchiveHashingConfigInstants(metaClient).size() == 2) assert(PartitionBucketIndexHashingConfig.getCommittedHashingConfigInstants(metaClient).size() == 4) + + // take care of showConfig command + val expected = PartitionBucketIndexHashingConfig.getAllHashingConfig(metaClient).asScala.map(config => { + config.toString + }).mkString("\\n") + checkAnswer(s"call partition_bucket_index_manager(table => '$tableName', showConfig => true)")( + Seq("SUCCESS", "SHOW_CONFIG", expected) + ) + } + } + } + } + + test("Test Add Expression and Rollback Command") { + withSQLConf( + "hoodie.datasource.write.operation" -> "upsert") { + withTempDir { tmp => + withTable(generateTableName) { tableName => + val tablePath = tmp.getCanonicalPath + "/" + tableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | dt string, + | name string, + | price double, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id,name', + | type = 'cow', + | preCombineField = 'ts', + | hoodie.index.type = 'BUCKET', + | hoodie.bucket.index.hash.field = 'id,name', + | hoodie.bucket.index.num.buckets = 1) + | partitioned by (dt) + | location '$tablePath' + | """.stripMargin) + + // Note: Do not write the field alias, the partition field must be placed last. + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1,1', 10, 1000, "2021-01-05"), + | (11, 'a1,1', 10, 1000, "2021-01-05"), + | (2, 'a2', 20, 2000, "2021-01-06"), + | (22, 'a2', 20, 2000, "2021-01-06") + | """.stripMargin) + val metaClient = createMetaClient(spark, tablePath) + + val expressions = "dt=(2021\\-01\\-07|2021\\-01\\-08),2" + val rule = "regex" + val defaultBucketNumber = 1 + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber, dryRun => false)") + + val expressions2 = "dt=(2021\\-01\\-09|2021\\-01\\-10),3" + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', add => '$expressions2', dryRun => false)") + + val actualExpression = PartitionBucketIndexHashingConfig.loadingLatestHashingConfig(metaClient).getExpressions + val expectedExpression = s"""$expressions2;$expressions""" + assert(actualExpression.equals(expectedExpression.replace("\\",""))) + val commit = metaClient.reloadActiveTimeline().getCompletedReplaceTimeline.lastInstant() + + // rollback latest committed hashing config + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', rollback => '${commit.get().requestedTime()}')") + val actualExpression2 = PartitionBucketIndexHashingConfig.loadingLatestHashingConfig(metaClient).getExpressions + assert(actualExpression2.equals(expressions.replace("\\",""))) } } }
