This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 89332241ea0b fix(clustering): retain missing partitions in
selected/regex incremental scheduling (#18945)
89332241ea0b is described below
commit 89332241ea0b42671dae81b5ef06cb7ab9fad64b
Author: fhan <[email protected]>
AuthorDate: Wed Jun 10 23:27:13 2026 +0800
fix(clustering): retain missing partitions in selected/regex incremental
scheduling (#18945)
* fix(clustering): retain missing partitions in selected/regex incremental
scheduling
* fix(clustering): refine getMissPartitions helper method
* fix(clustering): return mutable missing partitions list
---------
Co-authored-by: fhan <[email protected]>
---
.../cluster/strategy/ClusteringPlanStrategy.java | 17 +++++++++++
.../PartitionAwareClusteringPlanStrategy.java | 12 ++++----
.../TestPartitionAwareClusteringPlanStrategy.java | 34 +++++++++++++++++++++-
.../action/cluster/TestIncrementalClustering.java | 4 +--
4 files changed, 59 insertions(+), 8 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
index 353f590c3f97..a2a32825d12b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
@@ -41,8 +41,11 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -137,6 +140,20 @@ public abstract class ClusteringPlanStrategy<T,I,K,O>
implements Serializable {
*/
protected abstract Map<String, String> getStrategyParams();
+ /**
+ * Keep partitions from the current scheduling window that are not scheduled
in this plan as missing
+ * partitions so that they can be picked up by later incremental clustering
schedules.
+ */
+ protected List<String> getMissingPartitionsFromCurrentWindow(List<String>
partitionsToSchedule,
+ List<String>
partitionsInCurrentWindow) {
+ if (!getWriteConfig().isIncrementalTableServiceEnabled()) {
+ return new ArrayList<>();
+ }
+ Set<String> missingPartitions = new
LinkedHashSet<>(partitionsInCurrentWindow);
+ missingPartitions.removeAll(new HashSet<>(partitionsToSchedule));
+ return new ArrayList<>(missingPartitions);
+ }
+
/**
* Returns any specific parameters to be stored as part of clustering
metadata.
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 077f1bb77e5f..3d49c5f406e1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -24,7 +24,6 @@ import org.apache.hudi.avro.model.HoodieClusteringStrategy;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -41,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
@@ -169,13 +169,15 @@ public abstract class
PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
if (StringUtils.isNullOrEmpty(partitionSelected)) {
// get matched partitions if set
- partitionPaths = getRegexPatternMatchedPartitions(config,
partitions.get());
+ // partitionsInCurrentWindow = incremental partitions + missing
partitions in last plan
+ List<String> partitionsInCurrentWindow = partitions.get();
+ partitionPaths = getRegexPatternMatchedPartitions(config,
partitionsInCurrentWindow);
+ missingPartitions =
getMissingPartitionsFromCurrentWindow(partitionPaths,
partitionsInCurrentWindow);
// filter the partition paths if needed to reduce list status
} else {
partitionPaths = Arrays.asList(partitionSelected.split(","));
- // Users may temporarily set specific partitions for clustering.
- // Ensure the coherence of the missing partitions.
- missingPartitions =
(List<String>)executor.fetchMissingPartitions(TableServiceType.CLUSTER).getRight();
+ missingPartitions = getMissingPartitionsFromCurrentWindow(partitionPaths,
+ config.isIncrementalTableServiceEnabled() ? partitions.get() :
Collections.emptyList());
}
Pair<List<String>, List<String>> partitionsPair =
filterPartitionPaths(getWriteConfig(), partitionPaths);
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
index 471245e3bc51..23a0b6223ca2 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
@@ -31,6 +31,8 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -40,6 +42,8 @@ import static
org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestPartitionAwareClusteringPlanStrategy {
@@ -83,6 +87,29 @@ public class TestPartitionAwareClusteringPlanStrategy {
assertTrue(list.contains("20210723"));
}
+ @Test
+ public void testResolveMissingPartitionsFromCurrentWindow() {
+ HoodieWriteConfig incrementalConfig = mock(HoodieWriteConfig.class);
+
when(incrementalConfig.isIncrementalTableServiceEnabled()).thenReturn(true);
+ DummyPartitionAwareClusteringPlanStrategy incrementalStrategy =
+ new DummyPartitionAwareClusteringPlanStrategy(table, context,
incrementalConfig);
+
+ assertEquals(Arrays.asList("p2", "p4"),
+ incrementalStrategy.resolveMissingPartitionsFromCurrentWindow(
+ Arrays.asList("p1", "p3"), Arrays.asList("p1", "p2", "p3", "p4")));
+
+ HoodieWriteConfig nonIncrementalConfig = mock(HoodieWriteConfig.class);
+
when(nonIncrementalConfig.isIncrementalTableServiceEnabled()).thenReturn(false);
+ DummyPartitionAwareClusteringPlanStrategy nonIncrementalStrategy =
+ new DummyPartitionAwareClusteringPlanStrategy(table, context,
nonIncrementalConfig);
+
+ List<String> nonIncrementalMissingPartitions =
nonIncrementalStrategy.resolveMissingPartitionsFromCurrentWindow(
+ Arrays.asList("p1", "p3"), Arrays.asList("p1", "p2", "p3", "p4"));
+ assertTrue(nonIncrementalMissingPartitions.isEmpty());
+ nonIncrementalMissingPartitions.addAll(Collections.singletonList("p5"));
+ assertEquals(Collections.singletonList("p5"),
nonIncrementalMissingPartitions);
+ }
+
@Test
public void testResolveEngineContextUsesLocalWhenEnabled() {
HoodieEngineContext engineContext = new HoodieLocalEngineContext(new
HadoopStorageConfiguration(false));
@@ -127,10 +154,15 @@ public class TestPartitionAwareClusteringPlanStrategy {
super(table, engineContext, writeConfig);
}
+ List<String> resolveMissingPartitionsFromCurrentWindow(List<String>
partitionsToSchedule,
+ List<String>
partitionsInCurrentWindow) {
+ return getMissingPartitionsFromCurrentWindow(partitionsToSchedule,
partitionsInCurrentWindow);
+ }
+
@Override
protected Map<String, String> getStrategyParams() {
return null;
}
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/cluster/TestIncrementalClustering.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/cluster/TestIncrementalClustering.java
index 8336a6f7e005..800655065501 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/cluster/TestIncrementalClustering.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/cluster/TestIncrementalClustering.java
@@ -131,8 +131,8 @@ public class TestIncrementalClustering extends
SparkClientFunctionalTestHarness
switch (mode) {
case NONE: {
- // For partitions filtered out by the regex expression, they will not
be recorded in the missingPartitions
- assertEquals(0, clusteringPlan.getMissingSchedulePartitions().size());
+ assertEquals(1, clusteringPlan.getMissingSchedulePartitions().size());
+
assertTrue(clusteringPlan.getMissingSchedulePartitions().contains(YESTERDAY));
break;
}
case SELECTED_PARTITIONS: {