This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 3c4a403a0a3fdb51027c0f2f76106322132513ff Author: StreamingFlames <18889897...@163.com> AuthorDate: Wed Dec 14 09:10:50 2022 +0800 [HUDI-5318] Fix partition pruning for clustering scheduling (#7366) Co-authored-by: Nicholas Jiang <programg...@163.com> (cherry picked from commit 6de923cfdfdfcc4d265e3af5e12749295c29bb1c) --- .../PartitionAwareClusteringPlanStrategy.java | 24 ++++---- .../TestPartitionAwareClusteringPlanStrategy.java | 2 +- .../hudi/procedure/TestClusteringProcedure.scala | 66 ++++++++++++++++++++++ 3 files changed, 78 insertions(+), 14 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java index 5d62ef39023..8aafa6d28c4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java @@ -71,11 +71,18 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient(); LOG.info("Scheduling clustering for " + metaClient.getBasePath()); HoodieWriteConfig config = getWriteConfig(); - List<String> partitionPaths = FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath()); - // get matched partitions if set - partitionPaths = getMatchedPartitions(config, partitionPaths); - // filter the partition paths if needed to reduce list status + String partitionSelected = config.getClusteringPartitionSelected(); + List<String> partitionPaths; + + if (StringUtils.isNullOrEmpty(partitionSelected)) { + // get matched partitions if set + partitionPaths = getRegexPatternMatchedPartitions(config, FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath())); + // filter the partition paths if needed to reduce list status + } else { + partitionPaths = Arrays.asList(partitionSelected.split(",")); + } + partitionPaths = filterPartitionPaths(partitionPaths); if (partitionPaths.isEmpty()) { @@ -114,15 +121,6 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor .build()); } - public List<String> getMatchedPartitions(HoodieWriteConfig config, List<String> partitionPaths) { - String partitionSelected = config.getClusteringPartitionSelected(); - if (!StringUtils.isNullOrEmpty(partitionSelected)) { - return Arrays.asList(partitionSelected.split(",")); - } else { - return getRegexPatternMatchedPartitions(config, partitionPaths); - } - } - public List<String> getRegexPatternMatchedPartitions(HoodieWriteConfig config, List<String> partitionPaths) { String pattern = config.getClusteringPartitionFilterRegexPattern(); if (!StringUtils.isNullOrEmpty(pattern)) { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java index 440bc956153..a053a961105 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java @@ -71,7 +71,7 @@ public class TestPartitionAwareClusteringPlanStrategy { fakeTimeBasedPartitionsPath.add("20210719"); fakeTimeBasedPartitionsPath.add("20210721"); - List list = strategyTestRegexPattern.getMatchedPartitions(hoodieWriteConfig, fakeTimeBasedPartitionsPath); + List list = strategyTestRegexPattern.getRegexPatternMatchedPartitions(hoodieWriteConfig, fakeTimeBasedPartitionsPath); assertEquals(2, list.size()); assertTrue(list.contains("20210721")); assertTrue(list.contains("20210723")); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala index e488811c0db..c10c56f3350 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala @@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline} import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.common.util.collection.Pair +import org.apache.hudi.config.HoodieClusteringConfig import org.apache.hudi.{DataSourceReadOptions, HoodieCLIUtils, HoodieDataSourceHelpers, HoodieFileIndex} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} import org.apache.spark.sql.types.{DataTypes, Metadata, StringType, StructField, StructType} @@ -602,6 +603,71 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase { } } + test("Test Call run_clustering with partition selected config") { + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = 'cow', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' + """.stripMargin) + + // Test clustering with PARTITION_SELECTED config set, choose only a part of all partitions to schedule + { + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011)") + spark.sql(s"set ${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010") + // Do + val result = spark.sql(s"call run_clustering(table => '$tableName', show_involved_partition => true)") + .collect() + .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3))) + assertResult(1)(result.length) + assertResult("ts=1010")(result(0)(3)) + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1010), + Seq(2, "a2", 10.0, 1010), + Seq(3, "a3", 10.0, 1011) + ) + } + + // Test clustering with PARTITION_SELECTED config set, choose all partitions to schedule + { + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010)") + spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011)") + spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012)") + spark.sql(s"set ${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010,ts=1011,ts=1012") + val result = spark.sql(s"call run_clustering(table => '$tableName', show_involved_partition => true)") + .collect() + .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3))) + assertResult(1)(result.length) + assertResult("ts=1010,ts=1011,ts=1012")(result(0)(3)) + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1010), + Seq(2, "a2", 10.0, 1010), + Seq(3, "a3", 10.0, 1011), + Seq(4, "a4", 10.0, 1010), + Seq(5, "a5", 10.0, 1011), + Seq(6, "a6", 10.0, 1012) + ) + } + } + } + def avgRecord(commitTimeline: HoodieTimeline): Long = { var totalByteSize = 0L var totalRecordsCount = 0L