This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 89332241ea0b fix(clustering): retain missing partitions in 
selected/regex incremental scheduling (#18945)
89332241ea0b is described below

commit 89332241ea0b42671dae81b5ef06cb7ab9fad64b
Author: fhan <[email protected]>
AuthorDate: Wed Jun 10 23:27:13 2026 +0800

    fix(clustering): retain missing partitions in selected/regex incremental 
scheduling (#18945)
    
    * fix(clustering): retain missing partitions in selected/regex incremental 
scheduling
    * fix(clustering): refine getMissPartitions helper method
    * fix(clustering): return mutable missing partitions list
    
    ---------
    
    Co-authored-by: fhan <[email protected]>
---
 .../cluster/strategy/ClusteringPlanStrategy.java   | 17 +++++++++++
 .../PartitionAwareClusteringPlanStrategy.java      | 12 ++++----
 .../TestPartitionAwareClusteringPlanStrategy.java  | 34 +++++++++++++++++++++-
 .../action/cluster/TestIncrementalClustering.java  |  4 +--
 4 files changed, 59 insertions(+), 8 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
index 353f590c3f97..a2a32825d12b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
@@ -41,8 +41,11 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -137,6 +140,20 @@ public abstract class ClusteringPlanStrategy<T,I,K,O> 
implements Serializable {
    */
   protected abstract Map<String, String> getStrategyParams();
 
+  /**
+   * Keep partitions from the current scheduling window that are not scheduled 
in this plan as missing
+   * partitions so that they can be picked up by later incremental clustering 
schedules.
+   */
+  protected List<String> getMissingPartitionsFromCurrentWindow(List<String> 
partitionsToSchedule,
+                                                               List<String> 
partitionsInCurrentWindow) {
+    if (!getWriteConfig().isIncrementalTableServiceEnabled()) {
+      return new ArrayList<>();
+    }
+    Set<String> missingPartitions = new 
LinkedHashSet<>(partitionsInCurrentWindow);
+    missingPartitions.removeAll(new HashSet<>(partitionsToSchedule));
+    return new ArrayList<>(missingPartitions);
+  }
+
   /**
    * Returns any specific parameters to be stored as part of clustering 
metadata.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 077f1bb77e5f..3d49c5f406e1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -24,7 +24,6 @@ import org.apache.hudi.avro.model.HoodieClusteringStrategy;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -41,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
@@ -169,13 +169,15 @@ public abstract class 
PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
 
     if (StringUtils.isNullOrEmpty(partitionSelected)) {
       // get matched partitions if set
-      partitionPaths = getRegexPatternMatchedPartitions(config, 
partitions.get());
+      // partitionsInCurrentWindow = incremental partitions + missing 
partitions in last plan
+      List<String> partitionsInCurrentWindow = partitions.get();
+      partitionPaths = getRegexPatternMatchedPartitions(config, 
partitionsInCurrentWindow);
+      missingPartitions = 
getMissingPartitionsFromCurrentWindow(partitionPaths, 
partitionsInCurrentWindow);
       // filter the partition paths if needed to reduce list status
     } else {
       partitionPaths = Arrays.asList(partitionSelected.split(","));
-      // Users may temporarily set specific partitions for clustering.
-      // Ensure the coherence of the missing partitions.
-      missingPartitions = 
(List<String>)executor.fetchMissingPartitions(TableServiceType.CLUSTER).getRight();
+      missingPartitions = getMissingPartitionsFromCurrentWindow(partitionPaths,
+          config.isIncrementalTableServiceEnabled() ? partitions.get() : 
Collections.emptyList());
     }
 
     Pair<List<String>, List<String>> partitionsPair = 
filterPartitionPaths(getWriteConfig(), partitionPaths);
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
index 471245e3bc51..23a0b6223ca2 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
@@ -31,6 +31,8 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -40,6 +42,8 @@ import static 
org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestPartitionAwareClusteringPlanStrategy {
 
@@ -83,6 +87,29 @@ public class TestPartitionAwareClusteringPlanStrategy {
     assertTrue(list.contains("20210723"));
   }
 
+  @Test
+  public void testResolveMissingPartitionsFromCurrentWindow() {
+    HoodieWriteConfig incrementalConfig = mock(HoodieWriteConfig.class);
+    
when(incrementalConfig.isIncrementalTableServiceEnabled()).thenReturn(true);
+    DummyPartitionAwareClusteringPlanStrategy incrementalStrategy =
+        new DummyPartitionAwareClusteringPlanStrategy(table, context, 
incrementalConfig);
+
+    assertEquals(Arrays.asList("p2", "p4"),
+        incrementalStrategy.resolveMissingPartitionsFromCurrentWindow(
+            Arrays.asList("p1", "p3"), Arrays.asList("p1", "p2", "p3", "p4")));
+
+    HoodieWriteConfig nonIncrementalConfig = mock(HoodieWriteConfig.class);
+    
when(nonIncrementalConfig.isIncrementalTableServiceEnabled()).thenReturn(false);
+    DummyPartitionAwareClusteringPlanStrategy nonIncrementalStrategy =
+        new DummyPartitionAwareClusteringPlanStrategy(table, context, 
nonIncrementalConfig);
+
+    List<String> nonIncrementalMissingPartitions = 
nonIncrementalStrategy.resolveMissingPartitionsFromCurrentWindow(
+        Arrays.asList("p1", "p3"), Arrays.asList("p1", "p2", "p3", "p4"));
+    assertTrue(nonIncrementalMissingPartitions.isEmpty());
+    nonIncrementalMissingPartitions.addAll(Collections.singletonList("p5"));
+    assertEquals(Collections.singletonList("p5"), 
nonIncrementalMissingPartitions);
+  }
+
   @Test
   public void testResolveEngineContextUsesLocalWhenEnabled() {
     HoodieEngineContext engineContext = new HoodieLocalEngineContext(new 
HadoopStorageConfiguration(false));
@@ -127,10 +154,15 @@ public class TestPartitionAwareClusteringPlanStrategy {
       super(table, engineContext, writeConfig);
     }
 
+    List<String> resolveMissingPartitionsFromCurrentWindow(List<String> 
partitionsToSchedule,
+                                                           List<String> 
partitionsInCurrentWindow) {
+      return getMissingPartitionsFromCurrentWindow(partitionsToSchedule, 
partitionsInCurrentWindow);
+    }
+
     @Override
     protected Map<String, String> getStrategyParams() {
       return null;
     }
   }
 
-}
\ No newline at end of file
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/cluster/TestIncrementalClustering.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/cluster/TestIncrementalClustering.java
index 8336a6f7e005..800655065501 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/cluster/TestIncrementalClustering.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/cluster/TestIncrementalClustering.java
@@ -131,8 +131,8 @@ public class TestIncrementalClustering extends 
SparkClientFunctionalTestHarness
 
     switch (mode) {
       case NONE: {
-        // For partitions filtered out by the regex expression, they will not 
be recorded in the missingPartitions
-        assertEquals(0, clusteringPlan.getMissingSchedulePartitions().size());
+        assertEquals(1, clusteringPlan.getMissingSchedulePartitions().size());
+        
assertTrue(clusteringPlan.getMissingSchedulePartitions().contains(YESTERDAY));
         break;
       }
       case SELECTED_PARTITIONS: {

Reply via email to