hudi-agent commented on code in PR #19040: URL: https://github.com/apache/hudi/pull/19040#discussion_r3436456871
########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairClusteringPlanProcedure.scala: ########## @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.HoodieCLIUtils +import org.apache.hudi.avro.model.{HoodieClusteringPlan, HoodieRequestedReplaceMetadata} +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.common.model.WriteOperationType +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.fs.HadoopFSUtils +import org.apache.hudi.io.util.FileIOUtils +import org.apache.hudi.storage.StoragePath + +import org.apache.hadoop.fs.Path +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ +import org.apache.spark.util.SerializableConfiguration + +import java.util +import java.util.Locale +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +class RepairClusteringPlanProcedure extends BaseProcedure with ProcedureBuilder with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "path", DataTypes.StringType), + ProcedureParameter.required(2, "instant", DataTypes.StringType), + ProcedureParameter.optional(3, "op", DataTypes.StringType), + ProcedureParameter.optional(4, "invalid_parquet_files", DataTypes.StringType, ""), + ProcedureParameter.optional(5, "validation_parallelism", DataTypes.IntegerType, 100), + ProcedureParameter.optional(6, "need_delete", DataTypes.BooleanType, false), + ProcedureParameter.optional(7, "dry_run", DataTypes.BooleanType, true), + ProcedureParameter.optional(8, "backup", DataTypes.BooleanType, true), + ProcedureParameter.optional(9, "allow_empty_plan", DataTypes.BooleanType, false), + ProcedureParameter.optional(10, "options", DataTypes.StringType, "") + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("reason", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + override def parameters: Array[ProcedureParameter] = PARAMETERS + + override def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) + val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.toString + val invalidParquetFiles = getArgValueOrDefault(args, PARAMETERS(4)).map(_.toString).getOrElse("") + val operation = getOperation(getArgValueOrDefault(args, PARAMETERS(3)), invalidParquetFiles) + val parallelism = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[Int] + val needDelete = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[Boolean] + val dryRun = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[Boolean] + val shouldBackup = getArgValueOrDefault(args, PARAMETERS(8)).get.asInstanceOf[Boolean] + val allowEmptyPlan = getArgValueOrDefault(args, PARAMETERS(9)).get.asInstanceOf[Boolean] + val options = getArgValueOrDefault(args, PARAMETERS(10)).map(_.toString).getOrElse("") + + val basePath = getBasePath(tableName, tablePath) + val metaClient = createMetaClient(jsc, basePath) + val (_, clusteringPlan) = RepairClusteringPlanProcedure.getRequestedClusteringPlan(metaClient, instantTime) + val candidates = getRepairCandidates(operation, invalidParquetFiles, clusteringPlan, parallelism) + if (candidates.isEmpty) { + Seq.empty + } else { + val candidatePaths = candidates.map(_.path).toSet + if (!allowEmptyPlan && !RepairClusteringPlanProcedure.hasRetainedInputGroup(clusteringPlan, candidatePaths)) { + throw new HoodieException( + s"Repairing clustering instant $instantTime would remove all input groups. " + + "Set allow_empty_plan => true to allow this operation.") + } + + if (dryRun) { + candidates.map(candidate => + Row(instantTime, candidate.path, RepairClusteringPlanProcedure.WOULD_REMOVE_FROM_PLAN, false, candidate.reason)) + } else { + var client: SparkRDDWriteClient[_] = null + var repairedPaths = Set.empty[String] + val confs = if (options.trim.isEmpty) Map.empty[String, String] else HoodieCLIUtils.extractOptions(options) + val tableNameOpt = tableName.map(_.toString) + try { + client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, confs, tableNameOpt) + val transactionOwner = HOption.of(metaClient.getInstantGenerator.createNewInstant( + HoodieInstant.State.REQUESTED, + clusteringPlanAction(metaClient, instantTime), + instantTime)) + val txnManager = client.getTransactionManager + txnManager.beginStateChange(transactionOwner, metaClient.reloadActiveTimeline().filterCompletedInstants().lastInstant()) + try { + val latestMetaClient = createMetaClient(jsc, basePath) + val (latestInstant, latestPlan) = RepairClusteringPlanProcedure.getRequestedClusteringPlan(latestMetaClient, instantTime) + val latestPlanFiles = RepairClusteringPlanProcedure.getPlanDataFiles(latestPlan).toSet + val filesToRepair = candidatePaths.intersect(latestPlanFiles) + if (filesToRepair.nonEmpty) { + if (!allowEmptyPlan && !RepairClusteringPlanProcedure.hasRetainedInputGroup(latestPlan, filesToRepair)) { + throw new HoodieException( + s"Repairing clustering instant $instantTime would remove all input groups. " + + "Set allow_empty_plan => true to allow this operation.") + } + + val repairedPlan = RepairClusteringPlanProcedure.pruneClusteringPlan(latestPlan, filesToRepair) + RepairClusteringPlanProcedure.rewriteRequestedClusteringPlan( + latestMetaClient, latestInstant, repairedPlan, latestPlan.getExtraMetadata, shouldBackup) + repairedPaths = filesToRepair + } + } finally { + txnManager.endStateChange(transactionOwner) + } + } finally { + if (client != null) { + client.close() + } + } + + val deleteResults = if (needDelete) { + deletePhysicalFiles(repairedPaths.toSeq) + } else { + Map.empty[String, Boolean] + } + + candidates.map { candidate => + val repaired = repairedPaths.contains(candidate.path) + Row( + instantTime, + candidate.path, + if (repaired) RepairClusteringPlanProcedure.REMOVED_FROM_PLAN else RepairClusteringPlanProcedure.NOT_FOUND_IN_PLAN, + deleteResults.getOrElse(candidate.path, false), + candidate.reason) + } + } + } + } + + private def getOperation(operationArg: Option[Any], invalidParquetFiles: String): String = { + operationArg + .map(_.toString.trim) + .filter(_.nonEmpty) + .getOrElse { + if (invalidParquetFiles.trim.nonEmpty) { + RepairClusteringPlanProcedure.DELETE_OPERATION + } else { + RepairClusteringPlanProcedure.VALIDATE_DELETE_OPERATION + } + } + .toLowerCase(Locale.ROOT) + } + + private def getRepairCandidates(operation: String, + invalidParquetFiles: String, Review Comment: 🤖 The validation only marks a file invalid when the exception message contains the literal substring `is not a Parquet file`. Truncated footers, zero-length-but-nonempty-magic files, IOExceptions from missing/permission-denied files, and other corruption modes won't match this string and will be silently treated as valid. Could we broaden this — e.g. treat any non-retriable read failure as invalid (perhaps after a sanity existence check), or at least surface the unmatched exception in the output rather than swallowing it? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairClusteringPlanProcedure.scala: ########## @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.HoodieCLIUtils +import org.apache.hudi.avro.model.{HoodieClusteringPlan, HoodieRequestedReplaceMetadata} +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.common.model.WriteOperationType +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.fs.HadoopFSUtils +import org.apache.hudi.io.util.FileIOUtils +import org.apache.hudi.storage.StoragePath + +import org.apache.hadoop.fs.Path +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ +import org.apache.spark.util.SerializableConfiguration + +import java.util +import java.util.Locale +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +class RepairClusteringPlanProcedure extends BaseProcedure with ProcedureBuilder with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "path", DataTypes.StringType), + ProcedureParameter.required(2, "instant", DataTypes.StringType), + ProcedureParameter.optional(3, "op", DataTypes.StringType), + ProcedureParameter.optional(4, "invalid_parquet_files", DataTypes.StringType, ""), + ProcedureParameter.optional(5, "validation_parallelism", DataTypes.IntegerType, 100), + ProcedureParameter.optional(6, "need_delete", DataTypes.BooleanType, false), + ProcedureParameter.optional(7, "dry_run", DataTypes.BooleanType, true), + ProcedureParameter.optional(8, "backup", DataTypes.BooleanType, true), + ProcedureParameter.optional(9, "allow_empty_plan", DataTypes.BooleanType, false), + ProcedureParameter.optional(10, "options", DataTypes.StringType, "") + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("reason", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + override def parameters: Array[ProcedureParameter] = PARAMETERS + + override def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) + val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.toString + val invalidParquetFiles = getArgValueOrDefault(args, PARAMETERS(4)).map(_.toString).getOrElse("") + val operation = getOperation(getArgValueOrDefault(args, PARAMETERS(3)), invalidParquetFiles) + val parallelism = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[Int] + val needDelete = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[Boolean] + val dryRun = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[Boolean] + val shouldBackup = getArgValueOrDefault(args, PARAMETERS(8)).get.asInstanceOf[Boolean] + val allowEmptyPlan = getArgValueOrDefault(args, PARAMETERS(9)).get.asInstanceOf[Boolean] + val options = getArgValueOrDefault(args, PARAMETERS(10)).map(_.toString).getOrElse("") + + val basePath = getBasePath(tableName, tablePath) + val metaClient = createMetaClient(jsc, basePath) + val (_, clusteringPlan) = RepairClusteringPlanProcedure.getRequestedClusteringPlan(metaClient, instantTime) + val candidates = getRepairCandidates(operation, invalidParquetFiles, clusteringPlan, parallelism) + if (candidates.isEmpty) { + Seq.empty + } else { + val candidatePaths = candidates.map(_.path).toSet + if (!allowEmptyPlan && !RepairClusteringPlanProcedure.hasRetainedInputGroup(clusteringPlan, candidatePaths)) { + throw new HoodieException( + s"Repairing clustering instant $instantTime would remove all input groups. " + + "Set allow_empty_plan => true to allow this operation.") + } + + if (dryRun) { + candidates.map(candidate => + Row(instantTime, candidate.path, RepairClusteringPlanProcedure.WOULD_REMOVE_FROM_PLAN, false, candidate.reason)) + } else { + var client: SparkRDDWriteClient[_] = null + var repairedPaths = Set.empty[String] + val confs = if (options.trim.isEmpty) Map.empty[String, String] else HoodieCLIUtils.extractOptions(options) + val tableNameOpt = tableName.map(_.toString) + try { + client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, confs, tableNameOpt) + val transactionOwner = HOption.of(metaClient.getInstantGenerator.createNewInstant( + HoodieInstant.State.REQUESTED, + clusteringPlanAction(metaClient, instantTime), + instantTime)) + val txnManager = client.getTransactionManager + txnManager.beginStateChange(transactionOwner, metaClient.reloadActiveTimeline().filterCompletedInstants().lastInstant()) + try { + val latestMetaClient = createMetaClient(jsc, basePath) + val (latestInstant, latestPlan) = RepairClusteringPlanProcedure.getRequestedClusteringPlan(latestMetaClient, instantTime) + val latestPlanFiles = RepairClusteringPlanProcedure.getPlanDataFiles(latestPlan).toSet + val filesToRepair = candidatePaths.intersect(latestPlanFiles) + if (filesToRepair.nonEmpty) { + if (!allowEmptyPlan && !RepairClusteringPlanProcedure.hasRetainedInputGroup(latestPlan, filesToRepair)) { + throw new HoodieException( + s"Repairing clustering instant $instantTime would remove all input groups. " + + "Set allow_empty_plan => true to allow this operation.") + } + + val repairedPlan = RepairClusteringPlanProcedure.pruneClusteringPlan(latestPlan, filesToRepair) + RepairClusteringPlanProcedure.rewriteRequestedClusteringPlan( + latestMetaClient, latestInstant, repairedPlan, latestPlan.getExtraMetadata, shouldBackup) + repairedPaths = filesToRepair + } + } finally { + txnManager.endStateChange(transactionOwner) + } + } finally { + if (client != null) { + client.close() + } + } + + val deleteResults = if (needDelete) { + deletePhysicalFiles(repairedPaths.toSeq) + } else { + Map.empty[String, Boolean] + } + + candidates.map { candidate => + val repaired = repairedPaths.contains(candidate.path) + Row( + instantTime, + candidate.path, + if (repaired) RepairClusteringPlanProcedure.REMOVED_FROM_PLAN else RepairClusteringPlanProcedure.NOT_FOUND_IN_PLAN, + deleteResults.getOrElse(candidate.path, false), + candidate.reason) + } + } + } + } + + private def getOperation(operationArg: Option[Any], invalidParquetFiles: String): String = { + operationArg + .map(_.toString.trim) + .filter(_.nonEmpty) + .getOrElse { + if (invalidParquetFiles.trim.nonEmpty) { + RepairClusteringPlanProcedure.DELETE_OPERATION + } else { + RepairClusteringPlanProcedure.VALIDATE_DELETE_OPERATION + } + } + .toLowerCase(Locale.ROOT) + } + + private def getRepairCandidates(operation: String, + invalidParquetFiles: String, + clusteringPlan: HoodieClusteringPlan, + parallelism: Int): Seq[RepairCandidate] = { + operation match { + case RepairClusteringPlanProcedure.DELETE_OPERATION => + parseInvalidFiles(invalidParquetFiles).map(path => + RepairCandidate(path, RepairClusteringPlanProcedure.USER_REQUESTED)) + case RepairClusteringPlanProcedure.VALIDATE_DELETE_OPERATION | RepairClusteringPlanProcedure.CHECK_AND_DELETE_OPERATION => + validateInvalidParquetFiles(clusteringPlan, parallelism).map(path => + RepairCandidate(path, RepairClusteringPlanProcedure.NOT_PARQUET_FILE)) + case unsupported => + throw new UnsupportedOperationException( + s"Unsupported operation: '$unsupported'. Supported operations: delete, validate_delete, checkanddelete") + } + } + + private def parseInvalidFiles(filesParam: String): Seq[String] = { + require(filesParam != null && filesParam.trim.nonEmpty, + "Please set the files to be removed from the clustering plan via the parameter invalid_parquet_files.") + + filesParam.split(",") + .map(_.trim) + .filter(_.nonEmpty) + .distinct + .toSeq + } + + private def validateInvalidParquetFiles(clusteringPlan: HoodieClusteringPlan, + parallelism: Int): Seq[String] = { + val allFiles = RepairClusteringPlanProcedure.getPlanDataFiles(clusteringPlan).toSeq + if (allFiles.isEmpty) { + Seq.empty + } else { + val serHadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration()) + val rddParallelism = Math.max(1, Math.min(allFiles.size, parallelism)) + jsc.parallelize(allFiles.asJava, rddParallelism).rdd.filter { path => + var isInvalid = false + if (path.endsWith(".parquet")) { + try { + ParquetFileReader.readFooter(serHadoopConf.value, new Path(path), SKIP_ROW_GROUPS).getFileMetaData + } catch { + case e: Exception => + isInvalid = Option(e.getMessage).exists(_.contains("is not a Parquet file")) + } + } + isInvalid + }.collect().toSeq + } + } + + private def deletePhysicalFiles(filesToDelete: Seq[String]): Map[String, Boolean] = { + val storageConf = HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()) + filesToDelete.map { filePath => + val path = new Path(filePath) + val fs = HadoopFSUtils.getFs(path, storageConf.unwrap()) + val deleted = if (fs.exists(path)) { + if (!fs.delete(path, false)) { + throw new HoodieException(s"Failed to delete invalid parquet file after repairing clustering plan: $filePath") + } + true + } else { + false + } + filePath -> deleted + }.toMap + } + + private def clusteringPlanAction(metaClient: HoodieTableMetaClient, instantTime: String): String = { + RepairClusteringPlanProcedure.getRequestedClusteringPlan(metaClient, instantTime)._1.getAction + } + + override def build: Procedure = new RepairClusteringPlanProcedure() +} + +object RepairClusteringPlanProcedure { + val NAME = "repair_clustering_plan" + + val DELETE_OPERATION = "delete" + val VALIDATE_DELETE_OPERATION = "validate_delete" + val CHECK_AND_DELETE_OPERATION = "checkanddelete" + + val WOULD_REMOVE_FROM_PLAN = "WOULD_REMOVE_FROM_PLAN" + val REMOVED_FROM_PLAN = "REMOVED_FROM_PLAN" + val NOT_FOUND_IN_PLAN = "NOT_FOUND_IN_PLAN" + val USER_REQUESTED = "USER_REQUESTED" + val NOT_PARQUET_FILE = "NOT_PARQUET_FILE" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): ProcedureBuilder = new RepairClusteringPlanProcedure() + } + + private def getRequestedClusteringPlan(metaClient: HoodieTableMetaClient, + instantTime: String): (HoodieInstant, HoodieClusteringPlan) = { + val pendingPlan = ClusteringUtils.getAllPendingClusteringPlans(metaClient).iterator().asScala + .find(plan => plan.getLeft.requestedTime == instantTime && plan.getLeft.isRequested) + .getOrElse { + throw new HoodieException( + s"The requested clustering plan for instant $instantTime does not exist. Modifications are not supported.") + } + + (pendingPlan.getLeft, pendingPlan.getRight) + } + + private def getPlanDataFiles(plan: HoodieClusteringPlan): Iterable[String] = { + Option(plan.getInputGroups) + .map(_.asScala) + .getOrElse(Seq.empty) + .flatMap(group => Option(group.getSlices).map(_.asScala).getOrElse(Seq.empty)) + .map(_.getDataFilePath) + .filter(path => path != null && path.nonEmpty) + } + + private def hasRetainedInputGroup(plan: HoodieClusteringPlan, filesToRemove: Set[String]): Boolean = { + Option(plan.getInputGroups) + .map(_.asScala) + .getOrElse(Seq.empty) + .exists { group => + Option(group.getSlices) + .map(_.asScala) + .getOrElse(Seq.empty) + .exists(slice => !filesToRemove.contains(slice.getDataFilePath)) + } + } + + private def pruneClusteringPlan(plan: HoodieClusteringPlan, + filesToRemove: Set[String]): HoodieClusteringPlan = { + val retainedGroups = Option(plan.getInputGroups) + .map(_.asScala) + .getOrElse(Seq.empty) + .map { group => + val retainedSlices = Option(group.getSlices) + .map(_.asScala) + .getOrElse(Seq.empty) + .filter(slice => !filesToRemove.contains(slice.getDataFilePath)) + .asJava + group.setSlices(retainedSlices) + group + } + .filter(group => group.getSlices != null && !group.getSlices.isEmpty) + .asJava Review Comment: 🤖 If `shouldBackup` is false and `saveToPendingClusterCommit`/`saveToPendingReplaceCommit` throws after `deleteInstantFileIfExists` has already removed the original instant, `restoreRequestedInstantIfNeeded` short-circuits on `shouldBackup` and the requested instant file is permanently lost — the user effectively loses the clustering plan. Could we either always take a backup internally (independent of the user flag), or fail fast before deletion when `shouldBackup=false`? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairClusteringPlanProcedure.scala: ########## @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.HoodieCLIUtils +import org.apache.hudi.avro.model.{HoodieClusteringPlan, HoodieRequestedReplaceMetadata} +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.common.model.WriteOperationType +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.fs.HadoopFSUtils +import org.apache.hudi.io.util.FileIOUtils +import org.apache.hudi.storage.StoragePath + +import org.apache.hadoop.fs.Path +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ +import org.apache.spark.util.SerializableConfiguration + +import java.util +import java.util.Locale +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +class RepairClusteringPlanProcedure extends BaseProcedure with ProcedureBuilder with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "path", DataTypes.StringType), + ProcedureParameter.required(2, "instant", DataTypes.StringType), + ProcedureParameter.optional(3, "op", DataTypes.StringType), + ProcedureParameter.optional(4, "invalid_parquet_files", DataTypes.StringType, ""), + ProcedureParameter.optional(5, "validation_parallelism", DataTypes.IntegerType, 100), + ProcedureParameter.optional(6, "need_delete", DataTypes.BooleanType, false), + ProcedureParameter.optional(7, "dry_run", DataTypes.BooleanType, true), + ProcedureParameter.optional(8, "backup", DataTypes.BooleanType, true), + ProcedureParameter.optional(9, "allow_empty_plan", DataTypes.BooleanType, false), + ProcedureParameter.optional(10, "options", DataTypes.StringType, "") + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("reason", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + override def parameters: Array[ProcedureParameter] = PARAMETERS + + override def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) + val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.toString + val invalidParquetFiles = getArgValueOrDefault(args, PARAMETERS(4)).map(_.toString).getOrElse("") + val operation = getOperation(getArgValueOrDefault(args, PARAMETERS(3)), invalidParquetFiles) + val parallelism = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[Int] + val needDelete = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[Boolean] + val dryRun = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[Boolean] + val shouldBackup = getArgValueOrDefault(args, PARAMETERS(8)).get.asInstanceOf[Boolean] + val allowEmptyPlan = getArgValueOrDefault(args, PARAMETERS(9)).get.asInstanceOf[Boolean] + val options = getArgValueOrDefault(args, PARAMETERS(10)).map(_.toString).getOrElse("") + + val basePath = getBasePath(tableName, tablePath) + val metaClient = createMetaClient(jsc, basePath) + val (_, clusteringPlan) = RepairClusteringPlanProcedure.getRequestedClusteringPlan(metaClient, instantTime) + val candidates = getRepairCandidates(operation, invalidParquetFiles, clusteringPlan, parallelism) + if (candidates.isEmpty) { + Seq.empty + } else { + val candidatePaths = candidates.map(_.path).toSet + if (!allowEmptyPlan && !RepairClusteringPlanProcedure.hasRetainedInputGroup(clusteringPlan, candidatePaths)) { + throw new HoodieException( + s"Repairing clustering instant $instantTime would remove all input groups. " + + "Set allow_empty_plan => true to allow this operation.") + } + + if (dryRun) { + candidates.map(candidate => + Row(instantTime, candidate.path, RepairClusteringPlanProcedure.WOULD_REMOVE_FROM_PLAN, false, candidate.reason)) + } else { + var client: SparkRDDWriteClient[_] = null + var repairedPaths = Set.empty[String] + val confs = if (options.trim.isEmpty) Map.empty[String, String] else HoodieCLIUtils.extractOptions(options) Review Comment: 🤖 Minor UX inconsistency: in dry-run mode every candidate is reported as `WOULD_REMOVE_FROM_PLAN`, but the non-dry-run path correctly distinguishes `NOT_FOUND_IN_PLAN` for files the user listed that aren't actually in the plan. Could dry-run also intersect with the plan's file set so the preview matches what the real run would do? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairClusteringPlanProcedure.scala: ########## @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.HoodieCLIUtils +import org.apache.hudi.avro.model.{HoodieClusteringPlan, HoodieRequestedReplaceMetadata} +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.common.model.WriteOperationType +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.fs.HadoopFSUtils +import org.apache.hudi.io.util.FileIOUtils +import org.apache.hudi.storage.StoragePath + +import org.apache.hadoop.fs.Path +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ +import org.apache.spark.util.SerializableConfiguration + +import java.util +import java.util.Locale +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +class RepairClusteringPlanProcedure extends BaseProcedure with ProcedureBuilder with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "path", DataTypes.StringType), + ProcedureParameter.required(2, "instant", DataTypes.StringType), + ProcedureParameter.optional(3, "op", DataTypes.StringType), + ProcedureParameter.optional(4, "invalid_parquet_files", DataTypes.StringType, ""), + ProcedureParameter.optional(5, "validation_parallelism", DataTypes.IntegerType, 100), + ProcedureParameter.optional(6, "need_delete", DataTypes.BooleanType, false), + ProcedureParameter.optional(7, "dry_run", DataTypes.BooleanType, true), + ProcedureParameter.optional(8, "backup", DataTypes.BooleanType, true), + ProcedureParameter.optional(9, "allow_empty_plan", DataTypes.BooleanType, false), + ProcedureParameter.optional(10, "options", DataTypes.StringType, "") + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("reason", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + override def parameters: Array[ProcedureParameter] = PARAMETERS + + override def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) + val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.toString + val invalidParquetFiles = getArgValueOrDefault(args, PARAMETERS(4)).map(_.toString).getOrElse("") + val operation = getOperation(getArgValueOrDefault(args, PARAMETERS(3)), invalidParquetFiles) + val parallelism = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[Int] + val needDelete = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[Boolean] + val dryRun = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[Boolean] + val shouldBackup = getArgValueOrDefault(args, PARAMETERS(8)).get.asInstanceOf[Boolean] + val allowEmptyPlan = getArgValueOrDefault(args, PARAMETERS(9)).get.asInstanceOf[Boolean] + val options = getArgValueOrDefault(args, PARAMETERS(10)).map(_.toString).getOrElse("") + + val basePath = getBasePath(tableName, tablePath) + val metaClient = createMetaClient(jsc, basePath) + val (_, clusteringPlan) = RepairClusteringPlanProcedure.getRequestedClusteringPlan(metaClient, instantTime) Review Comment: 🤖 nit: the instant is discarded with `_` here but then retrieved a second time inside `clusteringPlanAction` (line 246), which just calls `getRequestedClusteringPlan` again. Could you capture it as `clusteringInstant` instead — `val (clusteringInstant, clusteringPlan) = ...` — and then pass `clusteringInstant.getAction` directly on line 115? That would let you drop the `clusteringPlanAction` helper entirely. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairClusteringPlanProcedure.scala: ########## @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.HoodieCLIUtils +import org.apache.hudi.avro.model.{HoodieClusteringPlan, HoodieRequestedReplaceMetadata} +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.common.model.WriteOperationType +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.fs.HadoopFSUtils +import org.apache.hudi.io.util.FileIOUtils +import org.apache.hudi.storage.StoragePath + +import org.apache.hadoop.fs.Path +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ +import org.apache.spark.util.SerializableConfiguration + +import java.util +import java.util.Locale +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +class RepairClusteringPlanProcedure extends BaseProcedure with ProcedureBuilder with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "path", DataTypes.StringType), + ProcedureParameter.required(2, "instant", DataTypes.StringType), + ProcedureParameter.optional(3, "op", DataTypes.StringType), + ProcedureParameter.optional(4, "invalid_parquet_files", DataTypes.StringType, ""), + ProcedureParameter.optional(5, "validation_parallelism", DataTypes.IntegerType, 100), + ProcedureParameter.optional(6, "need_delete", DataTypes.BooleanType, false), + ProcedureParameter.optional(7, "dry_run", DataTypes.BooleanType, true), + ProcedureParameter.optional(8, "backup", DataTypes.BooleanType, true), + ProcedureParameter.optional(9, "allow_empty_plan", DataTypes.BooleanType, false), + ProcedureParameter.optional(10, "options", DataTypes.StringType, "") + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("reason", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + override def parameters: Array[ProcedureParameter] = PARAMETERS + + override def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) + val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.toString + val invalidParquetFiles = getArgValueOrDefault(args, PARAMETERS(4)).map(_.toString).getOrElse("") + val operation = getOperation(getArgValueOrDefault(args, PARAMETERS(3)), invalidParquetFiles) + val parallelism = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[Int] + val needDelete = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[Boolean] + val dryRun = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[Boolean] + val shouldBackup = getArgValueOrDefault(args, PARAMETERS(8)).get.asInstanceOf[Boolean] + val allowEmptyPlan = getArgValueOrDefault(args, PARAMETERS(9)).get.asInstanceOf[Boolean] + val options = getArgValueOrDefault(args, PARAMETERS(10)).map(_.toString).getOrElse("") + + val basePath = getBasePath(tableName, tablePath) + val metaClient = createMetaClient(jsc, basePath) + val (_, clusteringPlan) = RepairClusteringPlanProcedure.getRequestedClusteringPlan(metaClient, instantTime) + val candidates = getRepairCandidates(operation, invalidParquetFiles, clusteringPlan, parallelism) + if (candidates.isEmpty) { + Seq.empty + } else { + val candidatePaths = candidates.map(_.path).toSet + if (!allowEmptyPlan && !RepairClusteringPlanProcedure.hasRetainedInputGroup(clusteringPlan, candidatePaths)) { + throw new HoodieException( + s"Repairing clustering instant $instantTime would remove all input groups. " + + "Set allow_empty_plan => true to allow this operation.") + } + + if (dryRun) { + candidates.map(candidate => + Row(instantTime, candidate.path, RepairClusteringPlanProcedure.WOULD_REMOVE_FROM_PLAN, false, candidate.reason)) + } else { + var client: SparkRDDWriteClient[_] = null + var repairedPaths = Set.empty[String] + val confs = if (options.trim.isEmpty) Map.empty[String, String] else HoodieCLIUtils.extractOptions(options) + val tableNameOpt = tableName.map(_.toString) + try { + client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, confs, tableNameOpt) + val transactionOwner = HOption.of(metaClient.getInstantGenerator.createNewInstant( + HoodieInstant.State.REQUESTED, + clusteringPlanAction(metaClient, instantTime), + instantTime)) + val txnManager = client.getTransactionManager + txnManager.beginStateChange(transactionOwner, metaClient.reloadActiveTimeline().filterCompletedInstants().lastInstant()) + try { + val latestMetaClient = createMetaClient(jsc, basePath) + val (latestInstant, latestPlan) = RepairClusteringPlanProcedure.getRequestedClusteringPlan(latestMetaClient, instantTime) + val latestPlanFiles = RepairClusteringPlanProcedure.getPlanDataFiles(latestPlan).toSet + val filesToRepair = candidatePaths.intersect(latestPlanFiles) + if (filesToRepair.nonEmpty) { + if (!allowEmptyPlan && !RepairClusteringPlanProcedure.hasRetainedInputGroup(latestPlan, filesToRepair)) { + throw new HoodieException( + s"Repairing clustering instant $instantTime would remove all input groups. " + + "Set allow_empty_plan => true to allow this operation.") + } + + val repairedPlan = RepairClusteringPlanProcedure.pruneClusteringPlan(latestPlan, filesToRepair) + RepairClusteringPlanProcedure.rewriteRequestedClusteringPlan( + latestMetaClient, latestInstant, repairedPlan, latestPlan.getExtraMetadata, shouldBackup) + repairedPaths = filesToRepair + } + } finally { + txnManager.endStateChange(transactionOwner) + } + } finally { + if (client != null) { + client.close() + } + } + + val deleteResults = if (needDelete) { + deletePhysicalFiles(repairedPaths.toSeq) + } else { + Map.empty[String, Boolean] + } + + candidates.map { candidate => + val repaired = repairedPaths.contains(candidate.path) + Row( + instantTime, + candidate.path, + if (repaired) RepairClusteringPlanProcedure.REMOVED_FROM_PLAN else RepairClusteringPlanProcedure.NOT_FOUND_IN_PLAN, + deleteResults.getOrElse(candidate.path, false), + candidate.reason) + } + } + } + } + + private def getOperation(operationArg: Option[Any], invalidParquetFiles: String): String = { + operationArg + .map(_.toString.trim) + .filter(_.nonEmpty) + .getOrElse { + if (invalidParquetFiles.trim.nonEmpty) { + RepairClusteringPlanProcedure.DELETE_OPERATION + } else { + RepairClusteringPlanProcedure.VALIDATE_DELETE_OPERATION + } + } + .toLowerCase(Locale.ROOT) + } + + private def getRepairCandidates(operation: String, + invalidParquetFiles: String, + clusteringPlan: HoodieClusteringPlan, + parallelism: Int): Seq[RepairCandidate] = { + operation match { + case RepairClusteringPlanProcedure.DELETE_OPERATION => + parseInvalidFiles(invalidParquetFiles).map(path => + RepairCandidate(path, RepairClusteringPlanProcedure.USER_REQUESTED)) + case RepairClusteringPlanProcedure.VALIDATE_DELETE_OPERATION | RepairClusteringPlanProcedure.CHECK_AND_DELETE_OPERATION => + validateInvalidParquetFiles(clusteringPlan, parallelism).map(path => + RepairCandidate(path, RepairClusteringPlanProcedure.NOT_PARQUET_FILE)) + case unsupported => + throw new UnsupportedOperationException( + s"Unsupported operation: '$unsupported'. Supported operations: delete, validate_delete, checkanddelete") + } + } + + private def parseInvalidFiles(filesParam: String): Seq[String] = { + require(filesParam != null && filesParam.trim.nonEmpty, + "Please set the files to be removed from the clustering plan via the parameter invalid_parquet_files.") + + filesParam.split(",") + .map(_.trim) + .filter(_.nonEmpty) + .distinct + .toSeq + } + + private def validateInvalidParquetFiles(clusteringPlan: HoodieClusteringPlan, + parallelism: Int): Seq[String] = { + val allFiles = RepairClusteringPlanProcedure.getPlanDataFiles(clusteringPlan).toSeq + if (allFiles.isEmpty) { + Seq.empty + } else { + val serHadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration()) + val rddParallelism = Math.max(1, Math.min(allFiles.size, parallelism)) + jsc.parallelize(allFiles.asJava, rddParallelism).rdd.filter { path => + var isInvalid = false + if (path.endsWith(".parquet")) { + try { + ParquetFileReader.readFooter(serHadoopConf.value, new Path(path), SKIP_ROW_GROUPS).getFileMetaData + } catch { + case e: Exception => + isInvalid = Option(e.getMessage).exists(_.contains("is not a Parquet file")) + } + } + isInvalid + }.collect().toSeq + } + } + + private def deletePhysicalFiles(filesToDelete: Seq[String]): Map[String, Boolean] = { + val storageConf = HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()) + filesToDelete.map { filePath => + val path = new Path(filePath) + val fs = HadoopFSUtils.getFs(path, storageConf.unwrap()) + val deleted = if (fs.exists(path)) { + if (!fs.delete(path, false)) { + throw new HoodieException(s"Failed to delete invalid parquet file after repairing clustering plan: $filePath") + } + true + } else { + false + } + filePath -> deleted + }.toMap + } + + private def clusteringPlanAction(metaClient: HoodieTableMetaClient, instantTime: String): String = { + RepairClusteringPlanProcedure.getRequestedClusteringPlan(metaClient, instantTime)._1.getAction + } + + override def build: Procedure = new RepairClusteringPlanProcedure() +} + +object RepairClusteringPlanProcedure { + val NAME = "repair_clustering_plan" + + val DELETE_OPERATION = "delete" + val VALIDATE_DELETE_OPERATION = "validate_delete" + val CHECK_AND_DELETE_OPERATION = "checkanddelete" Review Comment: 🤖 nit: `"checkanddelete"` is inconsistent with `"validate_delete"` — the other operation strings use underscores. Could you rename it to `"check_and_delete"` for consistency? The error message on line 191 surfaces both names side-by-side, which makes the mismatch especially visible to users. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairClusteringPlanProcedure.scala: ########## @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.HoodieCLIUtils +import org.apache.hudi.avro.model.{HoodieClusteringPlan, HoodieRequestedReplaceMetadata} +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.common.model.WriteOperationType +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.fs.HadoopFSUtils +import org.apache.hudi.io.util.FileIOUtils +import org.apache.hudi.storage.StoragePath + +import org.apache.hadoop.fs.Path +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ +import org.apache.spark.util.SerializableConfiguration + +import java.util +import java.util.Locale +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +class RepairClusteringPlanProcedure extends BaseProcedure with ProcedureBuilder with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "path", DataTypes.StringType), + ProcedureParameter.required(2, "instant", DataTypes.StringType), + ProcedureParameter.optional(3, "op", DataTypes.StringType), + ProcedureParameter.optional(4, "invalid_parquet_files", DataTypes.StringType, ""), + ProcedureParameter.optional(5, "validation_parallelism", DataTypes.IntegerType, 100), + ProcedureParameter.optional(6, "need_delete", DataTypes.BooleanType, false), + ProcedureParameter.optional(7, "dry_run", DataTypes.BooleanType, true), + ProcedureParameter.optional(8, "backup", DataTypes.BooleanType, true), + ProcedureParameter.optional(9, "allow_empty_plan", DataTypes.BooleanType, false), + ProcedureParameter.optional(10, "options", DataTypes.StringType, "") + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("reason", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + override def parameters: Array[ProcedureParameter] = PARAMETERS + + override def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) + val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.toString + val invalidParquetFiles = getArgValueOrDefault(args, PARAMETERS(4)).map(_.toString).getOrElse("") + val operation = getOperation(getArgValueOrDefault(args, PARAMETERS(3)), invalidParquetFiles) + val parallelism = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[Int] + val needDelete = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[Boolean] + val dryRun = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[Boolean] + val shouldBackup = getArgValueOrDefault(args, PARAMETERS(8)).get.asInstanceOf[Boolean] + val allowEmptyPlan = getArgValueOrDefault(args, PARAMETERS(9)).get.asInstanceOf[Boolean] + val options = getArgValueOrDefault(args, PARAMETERS(10)).map(_.toString).getOrElse("") + + val basePath = getBasePath(tableName, tablePath) + val metaClient = createMetaClient(jsc, basePath) + val (_, clusteringPlan) = RepairClusteringPlanProcedure.getRequestedClusteringPlan(metaClient, instantTime) + val candidates = getRepairCandidates(operation, invalidParquetFiles, clusteringPlan, parallelism) + if (candidates.isEmpty) { + Seq.empty + } else { + val candidatePaths = candidates.map(_.path).toSet + if (!allowEmptyPlan && !RepairClusteringPlanProcedure.hasRetainedInputGroup(clusteringPlan, candidatePaths)) { + throw new HoodieException( + s"Repairing clustering instant $instantTime would remove all input groups. " + + "Set allow_empty_plan => true to allow this operation.") + } + + if (dryRun) { + candidates.map(candidate => + Row(instantTime, candidate.path, RepairClusteringPlanProcedure.WOULD_REMOVE_FROM_PLAN, false, candidate.reason)) + } else { + var client: SparkRDDWriteClient[_] = null + var repairedPaths = Set.empty[String] + val confs = if (options.trim.isEmpty) Map.empty[String, String] else HoodieCLIUtils.extractOptions(options) + val tableNameOpt = tableName.map(_.toString) + try { + client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, confs, tableNameOpt) + val transactionOwner = HOption.of(metaClient.getInstantGenerator.createNewInstant( + HoodieInstant.State.REQUESTED, + clusteringPlanAction(metaClient, instantTime), + instantTime)) + val txnManager = client.getTransactionManager + txnManager.beginStateChange(transactionOwner, metaClient.reloadActiveTimeline().filterCompletedInstants().lastInstant()) + try { + val latestMetaClient = createMetaClient(jsc, basePath) + val (latestInstant, latestPlan) = RepairClusteringPlanProcedure.getRequestedClusteringPlan(latestMetaClient, instantTime) + val latestPlanFiles = RepairClusteringPlanProcedure.getPlanDataFiles(latestPlan).toSet + val filesToRepair = candidatePaths.intersect(latestPlanFiles) + if (filesToRepair.nonEmpty) { + if (!allowEmptyPlan && !RepairClusteringPlanProcedure.hasRetainedInputGroup(latestPlan, filesToRepair)) { + throw new HoodieException( + s"Repairing clustering instant $instantTime would remove all input groups. " + + "Set allow_empty_plan => true to allow this operation.") + } + + val repairedPlan = RepairClusteringPlanProcedure.pruneClusteringPlan(latestPlan, filesToRepair) + RepairClusteringPlanProcedure.rewriteRequestedClusteringPlan( + latestMetaClient, latestInstant, repairedPlan, latestPlan.getExtraMetadata, shouldBackup) + repairedPaths = filesToRepair + } + } finally { + txnManager.endStateChange(transactionOwner) + } + } finally { + if (client != null) { + client.close() + } + } + + val deleteResults = if (needDelete) { + deletePhysicalFiles(repairedPaths.toSeq) + } else { + Map.empty[String, Boolean] + } + + candidates.map { candidate => + val repaired = repairedPaths.contains(candidate.path) + Row( + instantTime, + candidate.path, + if (repaired) RepairClusteringPlanProcedure.REMOVED_FROM_PLAN else RepairClusteringPlanProcedure.NOT_FOUND_IN_PLAN, + deleteResults.getOrElse(candidate.path, false), + candidate.reason) + } + } + } + } + + private def getOperation(operationArg: Option[Any], invalidParquetFiles: String): String = { + operationArg + .map(_.toString.trim) + .filter(_.nonEmpty) + .getOrElse { + if (invalidParquetFiles.trim.nonEmpty) { + RepairClusteringPlanProcedure.DELETE_OPERATION + } else { + RepairClusteringPlanProcedure.VALIDATE_DELETE_OPERATION + } + } + .toLowerCase(Locale.ROOT) + } + + private def getRepairCandidates(operation: String, + invalidParquetFiles: String, + clusteringPlan: HoodieClusteringPlan, + parallelism: Int): Seq[RepairCandidate] = { + operation match { + case RepairClusteringPlanProcedure.DELETE_OPERATION => + parseInvalidFiles(invalidParquetFiles).map(path => + RepairCandidate(path, RepairClusteringPlanProcedure.USER_REQUESTED)) + case RepairClusteringPlanProcedure.VALIDATE_DELETE_OPERATION | RepairClusteringPlanProcedure.CHECK_AND_DELETE_OPERATION => + validateInvalidParquetFiles(clusteringPlan, parallelism).map(path => + RepairCandidate(path, RepairClusteringPlanProcedure.NOT_PARQUET_FILE)) + case unsupported => + throw new UnsupportedOperationException( + s"Unsupported operation: '$unsupported'. Supported operations: delete, validate_delete, checkanddelete") + } + } + + private def parseInvalidFiles(filesParam: String): Seq[String] = { + require(filesParam != null && filesParam.trim.nonEmpty, + "Please set the files to be removed from the clustering plan via the parameter invalid_parquet_files.") + + filesParam.split(",") + .map(_.trim) + .filter(_.nonEmpty) + .distinct + .toSeq + } + + private def validateInvalidParquetFiles(clusteringPlan: HoodieClusteringPlan, + parallelism: Int): Seq[String] = { + val allFiles = RepairClusteringPlanProcedure.getPlanDataFiles(clusteringPlan).toSeq + if (allFiles.isEmpty) { + Seq.empty + } else { + val serHadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration()) + val rddParallelism = Math.max(1, Math.min(allFiles.size, parallelism)) + jsc.parallelize(allFiles.asJava, rddParallelism).rdd.filter { path => + var isInvalid = false + if (path.endsWith(".parquet")) { + try { + ParquetFileReader.readFooter(serHadoopConf.value, new Path(path), SKIP_ROW_GROUPS).getFileMetaData + } catch { + case e: Exception => + isInvalid = Option(e.getMessage).exists(_.contains("is not a Parquet file")) + } + } + isInvalid + }.collect().toSeq + } + } + + private def deletePhysicalFiles(filesToDelete: Seq[String]): Map[String, Boolean] = { + val storageConf = HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()) + filesToDelete.map { filePath => + val path = new Path(filePath) + val fs = HadoopFSUtils.getFs(path, storageConf.unwrap()) + val deleted = if (fs.exists(path)) { + if (!fs.delete(path, false)) { + throw new HoodieException(s"Failed to delete invalid parquet file after repairing clustering plan: $filePath") + } + true + } else { + false + } + filePath -> deleted + }.toMap + } + + private def clusteringPlanAction(metaClient: HoodieTableMetaClient, instantTime: String): String = { + RepairClusteringPlanProcedure.getRequestedClusteringPlan(metaClient, instantTime)._1.getAction + } + + override def build: Procedure = new RepairClusteringPlanProcedure() +} + +object RepairClusteringPlanProcedure { + val NAME = "repair_clustering_plan" + + val DELETE_OPERATION = "delete" + val VALIDATE_DELETE_OPERATION = "validate_delete" + val CHECK_AND_DELETE_OPERATION = "checkanddelete" + + val WOULD_REMOVE_FROM_PLAN = "WOULD_REMOVE_FROM_PLAN" + val REMOVED_FROM_PLAN = "REMOVED_FROM_PLAN" + val NOT_FOUND_IN_PLAN = "NOT_FOUND_IN_PLAN" + val USER_REQUESTED = "USER_REQUESTED" + val NOT_PARQUET_FILE = "NOT_PARQUET_FILE" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): ProcedureBuilder = new RepairClusteringPlanProcedure() + } + + private def getRequestedClusteringPlan(metaClient: HoodieTableMetaClient, + instantTime: String): (HoodieInstant, HoodieClusteringPlan) = { + val pendingPlan = ClusteringUtils.getAllPendingClusteringPlans(metaClient).iterator().asScala + .find(plan => plan.getLeft.requestedTime == instantTime && plan.getLeft.isRequested) + .getOrElse { + throw new HoodieException( + s"The requested clustering plan for instant $instantTime does not exist. Modifications are not supported.") + } + + (pendingPlan.getLeft, pendingPlan.getRight) + } + + private def getPlanDataFiles(plan: HoodieClusteringPlan): Iterable[String] = { + Option(plan.getInputGroups) + .map(_.asScala) + .getOrElse(Seq.empty) + .flatMap(group => Option(group.getSlices).map(_.asScala).getOrElse(Seq.empty)) + .map(_.getDataFilePath) Review Comment: 🤖 `pruneClusteringPlan` mutates the input `plan` (via `group.setSlices` and `plan.setInputGroups`) rather than returning a fresh copy. It's safe here because the plan is freshly loaded inside the lock, but the name suggests a pure transformation — worth either renaming or building a new HoodieClusteringPlan to avoid surprising future callers. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
