This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 7588b918de23f12e098fdde6acac97dfd5c559c2 Author: XuQianJin-Stars <[email protected]> AuthorDate: Fri Feb 10 15:22:23 2023 +0800 add DropPartitionsProcedure --- .../procedures/BackupInvalidParquetProcedure.scala | 2 +- .../procedures/DropPartitionsProcedure.scala | 118 +++++++++++++++++++++ .../hudi/command/procedures/HoodieProcedures.scala | 1 + .../procedure/TestDropPartitionsProcedure.scala | 67 ++++++++++++ 4 files changed, 187 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala index f7b50bdc3d4..e0963baa471 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} import java.util.function.Supplier import scala.collection.JavaConversions.asScalaBuffer -import scala.jdk.CollectionConverters.seqAsJavaListConverter +import scala.collection.JavaConverters.seqAsJavaListConverter class BackupInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder with Logging { private val PARAMETERS = Array[ProcedureParameter]( diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DropPartitionsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DropPartitionsProcedure.scala new file mode 100644 index 00000000000..b194305e6c5 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DropPartitionsProcedure.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.util.ValidationUtils.checkArgument +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.HoodieCatalystExpressionUtils.{resolveExpr, splitPartitionAndDataPredicates} +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.PredicateHelper +import org.apache.spark.sql.execution.datasources.FileStatusCache +import org.apache.spark.sql.types._ + +import java.util.function.Supplier +import scala.collection.JavaConverters._ + +class DropPartitionsProcedure extends BaseProcedure + with ProcedureBuilder + with PredicateHelper + with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "path", DataTypes.StringType, None), + ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None), + ProcedureParameter.optional(3, "selected_partitions", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("result", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) + val predicate = getArgValueOrDefault(args, PARAMETERS(2)) + val parts = getArgValueOrDefault(args, PARAMETERS(3)) + + val basePath: String = getBasePath(tableName, tablePath) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val selectedPartitions: String = (parts, predicate) match { + case (_, Some(p)) => prunePartition(metaClient, p.asInstanceOf[String]) + case (Some(o), _) => o.asInstanceOf[String] + case _ => "" + } + + val rows: java.util.List[Row] = new java.util.ArrayList[Row]() + var partitionPaths: java.util.List[String] = new java.util.ArrayList[String]() + if (selectedPartitions.nonEmpty) { + partitionPaths = selectedPartitions.split(",").toList.asJava + logInfo(s"Drop partitions : $selectedPartitions") + } else { + logInfo("No partition to drop") + } + + partitionPaths.asScala.foreach(part => { + val dropSql = s"ALTER TABLE ${metaClient.getTableConfig.getTableName} DROP PARTITION ($part)" + logInfo(s"dropSql: $dropSql") + spark.sql(dropSql) + rows.add(Row(true, part)) + }) + + rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new DropPartitionsProcedure() + + def prunePartition(metaClient: HoodieTableMetaClient, predicate: String): String = { + val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path" -> metaClient.getBasePath) + val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None, options, + FileStatusCache.getOrCreate(sparkSession)) + + // Resolve partition predicates + val schemaResolver = new TableSchemaResolver(metaClient) + val tableSchema = AvroConversionUtils.convertAvroSchemaToStructType(schemaResolver.getTableAvroSchema) + val condition = resolveExpr(sparkSession, predicate, tableSchema) + val partitionColumns = metaClient.getTableConfig.getPartitionFields.orElse(Array[String]()) + val (partitionPredicates, dataPredicates) = splitPartitionAndDataPredicates( + sparkSession, splitConjunctivePredicates(condition).toArray, partitionColumns) + checkArgument(dataPredicates.isEmpty, "Only partition predicates are allowed") + + // Get all partitions and prune partition by predicates + val prunedPartitions = hoodieFileIndex.getPartitionPaths(partitionPredicates) + prunedPartitions.map(path => path.getPath.replaceAll("/", ",")).toSet.mkString(",") + } +} + +object DropPartitionsProcedure { + val NAME = "drop_partitions" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new DropPartitionsProcedure + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 5d945ecbfdb..f54db97b227 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -91,6 +91,7 @@ object HoodieProcedures { ,(HelpProcedure.NAME, HelpProcedure.builder) ,(DeleteRollbackInstantProcedure.NAME, DeleteRollbackInstantProcedure.builder) ,(DeleteFsFileProcedure.NAME, DeleteFsFileProcedure.builder) + ,(DropPartitionsProcedure.NAME, DropPartitionsProcedure.builder) ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDropPartitionsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDropPartitionsProcedure.scala new file mode 100644 index 00000000000..a6fb2be45ee --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDropPartitionsProcedure.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +class TestDropPartitionsProcedure extends HoodieSparkProcedureTestBase { + + test("Test Call drop_partitions Procedure With single-partition Pruning") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' + """.stripMargin) + + // Test partition pruning with single predicate + var resultA: Array[Seq[Any]] = Array.empty + + { + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + + checkException( + s"call drop_partitions(table => '$tableName', predicate => 'ts <= 1001L and id = 10')" + )("Only partition predicates are allowed") + + // Do table drop partitions with partition predicate + resultA = spark.sql(s"call drop_partitions(table => '$tableName', predicate => 'ts <= 1001L')") + .collect() + .map(row => Seq(row.getBoolean(0), row.getString(1))) + assertResult(2)(resultA.length) + } + } + } + } +}
