danny0405 commented on code in PR #13106:
URL: https://github.com/apache/hudi/pull/13106#discussion_r2038595048


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala:
##########
@@ -179,76 +181,88 @@ class PartitionBucketIndexManager extends BaseProcedure
 
     // get partitions need to be rescaled
     val rescalePartitionsMap = 
getDifferentPartitions(partition2BucketWithNewHashingConfig.asScala, 
partition2BucketWithLatestHashingConfig.asScala)
-    val partitionsToRescale = rescalePartitionsMap.keys
-
-    // get all fileSlices need to read
-    val allFilesMap = FSUtils.getFilesInPartitions(context, 
metaClient.getStorage(), 
HoodieMetadataConfig.newBuilder.enable(mdtEnable).build,
-      metaClient.getBasePath.toString, partitionsToRescale.map(relative => {
-        new StoragePath(basePath, relative)
-      }).map(storagePath => storagePath.toString).toArray)
-    val files = allFilesMap.values().asScala.flatMap(x => x.asScala).toList
-    val view = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline, files.asJava)
-    val allFileSlice = partitionsToRescale.flatMap(partitionPath => {
-      view.getLatestFileSlices(partitionPath).iterator().asScala
-    }).toList
-
-    // read all fileSlice para and get DF
-    var tableSchemaWithMetaFields: Schema = null
-    try tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(new 
TableSchemaResolver(metaClient).getTableAvroSchema(false), false)
-    catch {
-      case e: Exception =>
-        throw new HoodieException("Failed to get table schema during 
clustering", e)
-    }
+    if (dryRun) {
+      logInfo("Dry run OVERWRITE")
+      val content = rescalePartitionsMap.map(entry => {
+        val details =
+          s"""
+             |${entry._1} => ${entry._2}
+             |""".stripMargin
+        details
+      }).toSeq.mkString(";")
+      Seq(Row("SUCCESS", "DRY_RUN_OVERWRITE", content))
+    } else {
+      logInfo("Perform OVERWRITE with dry-run disabled.")
+      val partitionsToRescale = rescalePartitionsMap.keys
+      // get all fileSlices need to read
+      val allFilesMap = FSUtils.getFilesInPartitions(context, 
metaClient.getStorage(), 
HoodieMetadataConfig.newBuilder.enable(mdtEnable).build,
+        metaClient.getBasePath.toString, partitionsToRescale.map(relative => {
+          new StoragePath(basePath, relative)
+        }).map(storagePath => storagePath.toString).toArray)
+      val files = allFilesMap.values().asScala.flatMap(x => x.asScala).toList
+      val view = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline, files.asJava)
+      val allFileSlice = partitionsToRescale.flatMap(partitionPath => {
+        view.getLatestFileSlices(partitionPath).iterator().asScala
+      }).toList
+
+      // read all fileSlice para and get DF
+      var tableSchemaWithMetaFields: Schema = null
+      try tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(new 
TableSchemaResolver(metaClient).getTableAvroSchema(false), false)
+      catch {
+        case e: Exception =>
+          throw new HoodieException("Failed to get table schema during 
clustering", e)
+      }
 
-    // broadcast reader context.
-    val broadcastManager = new SparkBroadcastManager(context, metaClient)
-    broadcastManager.prepareAndBroadcast()
-    val sparkSchemaWithMetaFields = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields)
+      // broadcast reader context.
+      val broadcastManager = new SparkBroadcastManager(context, metaClient)
+      broadcastManager.prepareAndBroadcast()
+      val sparkSchemaWithMetaFields = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields)
 
-    val res: RDD[InternalRow] = if (allFileSlice.isEmpty) {
-      spark.sparkContext.emptyRDD
-    } else {
-      val serializableTableSchemaWithMetaFields = new 
SerializableSchema(tableSchemaWithMetaFields)
-      val latestInstantTime = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
-
-      spark.sparkContext.parallelize(allFileSlice, 
allFileSlice.size).flatMap(fileSlice => {
-        // instantiate other supporting cast
-        val readerSchema = serializableTableSchemaWithMetaFields.get
-        val readerContextOpt = 
broadcastManager.retrieveFileGroupReaderContext(basePath)
-        val internalSchemaOption: Option[InternalSchema] = Option.empty()
-        // instantiate FG reader
-        val fileGroupReader = new HoodieFileGroupReader(readerContextOpt.get(),
-          metaClient.getStorage,
-          basePath.toString,
-          latestInstantTime.requestedTime(),
-          fileSlice,
-          readerSchema,
-          readerSchema,
-          internalSchemaOption, // not support evolution of schema for now
-          metaClient,
-          metaClient.getTableConfig.getProps,
-          0,
-          java.lang.Long.MAX_VALUE,
-          HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.defaultValue(),
-          false)
-        fileGroupReader.initRecordIterators()
-        val iterator = 
fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]]
-        iterator.asScala
-      })
-    }
-    val dataFrame = HoodieUnsafeUtils.createDataFrameFromRDD(sparkSession, 
res, sparkSchemaWithMetaFields)
-    logInfo("Start to do bucket rescale for " + rescalePartitionsMap)
-    val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(
-      sparkSession.sqlContext,
-      SaveMode.Append,
-      finalConfig,
-      dataFrame)
+      val res: RDD[InternalRow] = if (allFileSlice.isEmpty) {
+        spark.sparkContext.emptyRDD
+      } else {
+        val serializableTableSchemaWithMetaFields = new 
SerializableSchema(tableSchemaWithMetaFields)
+        val latestInstantTime = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+
+        spark.sparkContext.parallelize(allFileSlice, 
allFileSlice.size).flatMap(fileSlice => {
+          // instantiate other supporting cast
+          val readerSchema = serializableTableSchemaWithMetaFields.get
+          val readerContextOpt = 
broadcastManager.retrieveFileGroupReaderContext(basePath)
+          val internalSchemaOption: Option[InternalSchema] = Option.empty()
+          // instantiate FG reader
+          val fileGroupReader = new 
HoodieFileGroupReader(readerContextOpt.get(),
+            metaClient.getStorage,
+            basePath.toString,
+            latestInstantTime.requestedTime(),
+            fileSlice,
+            readerSchema,
+            readerSchema,
+            internalSchemaOption, // not support evolution of schema for now
+            metaClient,
+            metaClient.getTableConfig.getProps,
+            0,
+            java.lang.Long.MAX_VALUE,
+            HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.defaultValue(),
+            false)
+          fileGroupReader.initRecordIterators()
+          val iterator = 
fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]]
+          iterator.asScala
+        })
+      }
+      val dataFrame = HoodieUnsafeUtils.createDataFrameFromRDD(sparkSession, 
res, sparkSchemaWithMetaFields)
+      logInfo("Start to do bucket rescale for " + rescalePartitionsMap)
+      val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(
+        sparkSession.sqlContext,
+        SaveMode.Append,
+        finalConfig,
+        dataFrame)
 
-    val details = s"Expression: $expression, Bucket Number: $bucketNumber, Dry 
Run: $dryRun"
+      val details = s"Expression: $expression, Bucket Number: $bucketNumber, 
Dry Run: $dryRun"

Review Comment:
   is the option val case-sensitive from users?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to