This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ba276029311 feat(table-services): Support clustering file groups with 
earlier instants times first (#18174)
9ba276029311 is described below

commit 9ba2760293115d68e679b53f1004fa20b411fd2e
Author: Krishen <[email protected]>
AuthorDate: Thu Mar 5 14:17:53 2026 -0800

    feat(table-services): Support clustering file groups with earlier instants 
times first (#18174)
    
    New config hoodie.clustering.plan.strategy.file.slices.sort.by (default 
SIZE). Accepts a comma-separated list of sort fields. Setting it to 
INSTANT_TIME,SIZE causes PartitionAwareClusteringPlanStrategy to sort file 
slices by base file commit time ascending first, then by file size descending, 
so older data is clustered first.
    
    Changelog:
    
    ClusteringFileSliceSortByField: New enum defining the available sort 
fields: INSTANT_TIME (commit time ascending) and SIZE (file size descending).
    ClusteringFileSliceComparator: New utility that parses the comma-separated 
config value into a composite Comparator<FileSlice>, combining individual field 
comparators in the specified order.
    HoodieClusteringConfig: Added PLAN_STRATEGY_FILE_SLICES_SORT_BY config 
property (key hoodie.clustering.plan.strategy.file.slices.sort.by, default 
SIZE) and Builder.withFileSlicesSortBy(String).
    HoodieWriteConfig: Added getFileSlicesSortBy().
    PartitionAwareClusteringPlanStrategy: Replaced inline comparator logic with 
a call to ClusteringFileSliceComparator.buildComparator(writeConfig).
    TestSparkSizeBasedClusteringPlanStrategy: Updated tests to use 
withFileSlicesSortBy("INSTANT_TIME,SIZE" / "SIZE"): 
testSortByInstantTimeThenSize, testSortBySizeOnly, 
testCommitTimeOrderingWithSameSizes, 
testSortingBehaviorComparisonInstantTimeVsSizeOnly.
---
 .../apache/hudi/config/HoodieClusteringConfig.java |  16 ++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../cluster/ClusteringFileSliceComparator.java     |  69 ++++++++
 .../cluster/ClusteringFileSliceSortByField.java    |  38 +++++
 .../PartitionAwareClusteringPlanStrategy.java      |   8 +-
 .../TestSparkSizeBasedClusteringPlanStrategy.java  | 175 ++++++++++++++++++++-
 6 files changed, 304 insertions(+), 6 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index b489ae646fa4..eb47688f2aea 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -217,6 +217,17 @@ public class HoodieClusteringConfig extends HoodieConfig {
       .sinceVersion("0.14.0")
       .withDocumentation("Whether to generate clustering plan when there is 
only one file group involved, by default true");
 
+  public static final ConfigProperty<String> PLAN_STRATEGY_FILE_SLICES_SORT_BY 
= ConfigProperty
+      .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "file.slices.sort.by")
+      .defaultValue("SIZE")
+      .markAdvanced()
+      .sinceVersion("1.2.0")
+      .withDocumentation("Comma-separated list of fields to sort file slices 
by when packing files together within a partition "
+          + "to create clustering groups. "
+          + "Available fields: INSTANT_TIME (sort by commit time ascending, so 
that older data files are clustered first), "
+          + "SIZE (sort by file size descending). For example, 
'INSTANT_TIME,SIZE' sorts by commit time first then by size. "
+          + "Default 'SIZE' sorts by file size only.");
+
   public static final ConfigProperty<String> PLAN_STRATEGY_SORT_COLUMNS = 
ConfigProperty
       .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "sort.columns")
       .noDefaultValue()
@@ -594,6 +605,11 @@ public class HoodieClusteringConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withFileSlicesSortBy(String sortByFields) {
+      clusteringConfig.setValue(PLAN_STRATEGY_FILE_SLICES_SORT_BY, 
sortByFields);
+      return this;
+    }
+
     public Builder withInlineClustering(Boolean inlineClustering) {
       clusteringConfig.setValue(INLINE_CLUSTERING, 
String.valueOf(inlineClustering));
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 144d62468c19..f961fca2c314 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2020,6 +2020,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getLong(HoodieClusteringConfig.PLAN_STRATEGY_TARGET_FILE_MAX_BYTES);
   }
 
+  public String getFileSlicesSortBy() {
+    return getString(HoodieClusteringConfig.PLAN_STRATEGY_FILE_SLICES_SORT_BY);
+  }
+
   public int getTargetPartitionsForClustering() {
     return getInt(HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringFileSliceComparator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringFileSliceComparator.java
new file mode 100644
index 000000000000..bd06aae4290e
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringFileSliceComparator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Builds a {@link Comparator} for {@link FileSlice} based on the
+ * {@link 
org.apache.hudi.config.HoodieClusteringConfig#PLAN_STRATEGY_FILE_SLICES_SORT_BY}
 config.
+ * The comparators for each field are combined in the order specified, so 
earlier fields take priority.
+ */
+public class ClusteringFileSliceComparator {
+
+  public static Comparator<FileSlice> buildComparator(HoodieWriteConfig 
config) {
+    String sortByFields = config.getFileSlicesSortBy();
+
+    List<ClusteringFileSliceSortByField> fields = 
Arrays.stream(sortByFields.split(","))
+        .map(String::trim)
+        .map(s -> ClusteringFileSliceSortByField.valueOf(s.toUpperCase()))
+        .collect(Collectors.toList());
+
+    if (fields.isEmpty()) {
+      throw new HoodieClusteringException("At least one sort field must be 
specified in: " + sortByFields);
+    }
+
+    Comparator<FileSlice> comparator = comparatorForField(fields.get(0), 
config);
+    for (int i = 1; i < fields.size(); i++) {
+      comparator = comparator.thenComparing(comparatorForField(fields.get(i), 
config));
+    }
+    return comparator;
+  }
+
+  private static Comparator<FileSlice> 
comparatorForField(ClusteringFileSliceSortByField field, HoodieWriteConfig 
config) {
+    switch (field) {
+      case INSTANT_TIME:
+        return Comparator.comparing(fileSlice ->
+            fileSlice.getBaseFile().map(baseFile -> 
baseFile.getCommitTime()).orElse(""));
+      case SIZE:
+        return Comparator.comparing(
+            (FileSlice fileSlice) -> fileSlice.getBaseFile().map(baseFile -> 
baseFile.getFileSize()).orElse(config.getParquetMaxFileSize()),
+            Comparator.reverseOrder());
+      default:
+        throw new HoodieClusteringException("Unknown file slice sort field: " 
+ field);
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringFileSliceSortByField.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringFileSliceSortByField.java
new file mode 100644
index 000000000000..991471a7907d
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringFileSliceSortByField.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.common.config.EnumDescription;
+import org.apache.hudi.common.config.EnumFieldDescription;
+
+/**
+ * Fields by which file slices can be sorted when creating clustering groups.
+ */
+@EnumDescription("Fields by which file slices are sorted when creating 
clustering groups. "
+    + "Multiple fields can be specified as a comma-separated list to define 
sort priority.")
+public enum ClusteringFileSliceSortByField {
+
+  @EnumFieldDescription("Sort by the commit/instant time of the file slice's 
base file in ascending order, "
+      + "so that older data files are clustered first (e.g. to reduce 
stitching lag).")
+  INSTANT_TIME,
+
+  @EnumFieldDescription("Sort by the file size of the file slice's base file 
in descending order, "
+      + "so that larger files are clustered first.")
+  SIZE
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 0d445a7e00f9..08d25c7907e1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.IncrementalPartitionAwareStrategy;
+import org.apache.hudi.table.action.cluster.ClusteringFileSliceComparator;
 import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
 import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilter;
 import org.apache.hudi.util.Lazy;
@@ -40,6 +41,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -68,11 +70,9 @@ public abstract class 
PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
     List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
     List<FileSlice> currentGroup = new ArrayList<>();
 
-    // Sort fileSlices before dividing, which makes dividing more compact
+    Comparator<FileSlice> sortedFileSlicesComparator = 
ClusteringFileSliceComparator.buildComparator(writeConfig);
     List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
-    sortedFileSlices.sort((o1, o2) -> (int)
-        ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() 
: writeConfig.getParquetMaxFileSize())
-            - (o1.getBaseFile().isPresent() ? 
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
+    sortedFileSlices.sort(sortedFileSlicesComparator);
 
     long totalSizeSoFar = 0;
     boolean partialScheduled = false;
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
index 1f95ea472f7a..d7f7cfc4a3d2 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
@@ -19,10 +19,12 @@
 package org.apache.hudi.client.clustering.plan.strategy;
 
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieSliceInfo;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
@@ -82,10 +84,179 @@ public class TestSparkSizeBasedClusteringPlanStrategy {
     Assertions.assertEquals(1, 
clusteringGroups.get(1).getNumOutputFileGroups());
   }
 
+  @Test
+  public void testSortByInstantTimeThenSize() {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath("")
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            
.withClusteringPlanStrategyClass(SparkSizeBasedClusteringPlanStrategy.class.getName())
+            .withClusteringMaxBytesInGroup(750)
+            .withClusteringTargetFileMaxBytes(1000)
+            .withClusteringPlanSmallFileLimit(50)
+            .withClusteringMaxNumGroups(1)
+            .withFileSlicesSortBy("INSTANT_TIME,SIZE")
+            .build())
+        .build();
+
+    SparkSizeBasedClusteringPlanStrategy planStrategy = new 
SparkSizeBasedClusteringPlanStrategy(table, context, config);
+
+    ArrayList<FileSlice> fileSlices = new ArrayList<>();
+    fileSlices.add(createFileSliceWithCommitTime(400, "003"));
+    fileSlices.add(createFileSliceWithCommitTime(500, "001"));
+    fileSlices.add(createFileSliceWithCommitTime(200, "001"));
+    fileSlices.add(createFileSliceWithCommitTime(100, "002"));
+    fileSlices.add(createFileSliceWithCommitTime(300, "002"));
+
+    Pair<Stream<HoodieClusteringGroup>, Boolean> result =
+        planStrategy.buildClusteringGroupsForPartition("p0", fileSlices);
+    List<HoodieClusteringGroup> clusteringGroups =
+        ((Stream<HoodieClusteringGroup>) 
result.getLeft()).collect(Collectors.toList());
+
+    Assertions.assertEquals(1, clusteringGroups.size());
+
+    List<HoodieSliceInfo> slicesInPlan = clusteringGroups.get(0).getSlices();
+    Assertions.assertEquals(2, slicesInPlan.size());
+    for (HoodieSliceInfo slice : slicesInPlan) {
+      Assertions.assertTrue(slice.getDataFilePath().contains("001"),
+          "Expected only slices from earliest commit '001', but found: " + 
slice.getDataFilePath());
+    }
+
+    Assertions.assertTrue(result.getRight(), "Should indicate partial 
scheduling since not all slices were processed");
+  }
+
+  @Test
+  public void testSortBySizeOnly() {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath("")
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            
.withClusteringPlanStrategyClass(SparkSizeBasedClusteringPlanStrategy.class.getName())
+            .withClusteringMaxBytesInGroup(2000)
+            .withClusteringTargetFileMaxBytes(1000)
+            .withClusteringPlanSmallFileLimit(500)
+            .withFileSlicesSortBy("SIZE")
+            .build())
+        .build();
+
+    SparkSizeBasedClusteringPlanStrategy planStrategy = new 
SparkSizeBasedClusteringPlanStrategy(table, context, config);
+
+    ArrayList<FileSlice> fileSlices = new ArrayList<>();
+    fileSlices.add(createFileSliceWithCommitTime(400, "003"));
+    fileSlices.add(createFileSliceWithCommitTime(200, "001"));
+    fileSlices.add(createFileSliceWithCommitTime(300, "002"));
+    fileSlices.add(createFileSliceWithCommitTime(500, "001"));
+
+    Stream<HoodieClusteringGroup> clusteringGroupStream =
+        (Stream<HoodieClusteringGroup>) 
planStrategy.buildClusteringGroupsForPartition("p0", fileSlices).getLeft();
+    List<HoodieClusteringGroup> clusteringGroups = 
clusteringGroupStream.collect(Collectors.toList());
+
+    Assertions.assertTrue(clusteringGroups.size() > 0);
+
+    HoodieClusteringGroup firstGroup = clusteringGroups.get(0);
+    Assertions.assertTrue(firstGroup.getSlices().size() > 0);
+  }
+
+  @Test
+  public void testCommitTimeOrderingWithSameSizes() {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath("")
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            
.withClusteringPlanStrategyClass(SparkSizeBasedClusteringPlanStrategy.class.getName())
+            .withClusteringMaxBytesInGroup(300)
+            .withClusteringTargetFileMaxBytes(1000)
+            .withClusteringPlanSmallFileLimit(1000)
+            .withFileSlicesSortBy("INSTANT_TIME,SIZE")
+            .build())
+        .build();
+
+    SparkSizeBasedClusteringPlanStrategy planStrategy = new 
SparkSizeBasedClusteringPlanStrategy(table, context, config);
+
+    ArrayList<FileSlice> fileSlices = new ArrayList<>();
+    fileSlices.add(createFileSliceWithCommitTime(300, "003"));
+    fileSlices.add(createFileSliceWithCommitTime(300, "001"));
+    fileSlices.add(createFileSliceWithCommitTime(300, "002"));
+
+    Stream<HoodieClusteringGroup> clusteringGroupStream =
+        (Stream<HoodieClusteringGroup>) 
planStrategy.buildClusteringGroupsForPartition("p0", fileSlices).getLeft();
+    List<HoodieClusteringGroup> clusteringGroups = 
clusteringGroupStream.collect(Collectors.toList());
+
+    
Assertions.assertTrue(clusteringGroups.get(0).getSlices().get(0).getDataFilePath().contains("001"));
+    
Assertions.assertTrue(clusteringGroups.get(1).getSlices().get(0).getDataFilePath().contains("002"));
+    
Assertions.assertTrue(clusteringGroups.get(2).getSlices().get(0).getDataFilePath().contains("003"));
+  }
+
+  @Test
+  public void testSortingBehaviorComparisonInstantTimeVsSizeOnly() {
+    HoodieWriteConfig configEnabled = HoodieWriteConfig.newBuilder()
+        .withPath("")
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            
.withClusteringPlanStrategyClass(SparkSizeBasedClusteringPlanStrategy.class.getName())
+            .withClusteringMaxBytesInGroup(200)
+            .withClusteringTargetFileMaxBytes(1000)
+            .withClusteringPlanSmallFileLimit(1000)
+            .withFileSlicesSortBy("INSTANT_TIME,SIZE")
+            .build())
+        .build();
+
+    HoodieWriteConfig configDisabled = HoodieWriteConfig.newBuilder()
+        .withPath("")
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            
.withClusteringPlanStrategyClass(SparkSizeBasedClusteringPlanStrategy.class.getName())
+            .withClusteringMaxBytesInGroup(200)
+            .withClusteringTargetFileMaxBytes(1000)
+            .withClusteringPlanSmallFileLimit(1000)
+            .withFileSlicesSortBy("SIZE")
+            .build())
+        .build();
+
+    SparkSizeBasedClusteringPlanStrategy planStrategyEnabled = new 
SparkSizeBasedClusteringPlanStrategy(table, context, configEnabled);
+    SparkSizeBasedClusteringPlanStrategy planStrategyDisabled = new 
SparkSizeBasedClusteringPlanStrategy(table, context, configDisabled);
+
+    ArrayList<FileSlice> fileSlicesEnabled = new ArrayList<>();
+    ArrayList<FileSlice> fileSlicesDisabled = new ArrayList<>();
+
+    String[] commitTimes = {"001", "002", "003"};
+    long[] fileSizes = {100, 200, 150};
+
+    for (int i = 0; i < commitTimes.length; i++) {
+      fileSlicesEnabled.add(createFileSliceWithCommitTime(fileSizes[i], 
commitTimes[i]));
+      fileSlicesDisabled.add(createFileSliceWithCommitTime(fileSizes[i], 
commitTimes[i]));
+    }
+
+    Stream<HoodieClusteringGroup> streamEnabled =
+        (Stream<HoodieClusteringGroup>) 
planStrategyEnabled.buildClusteringGroupsForPartition("p0", 
fileSlicesEnabled).getLeft();
+    List<HoodieClusteringGroup> groupsEnabled = 
streamEnabled.collect(Collectors.toList());
+
+    Stream<HoodieClusteringGroup> streamDisabled =
+        (Stream<HoodieClusteringGroup>) 
planStrategyDisabled.buildClusteringGroupsForPartition("p0", 
fileSlicesDisabled).getLeft();
+    List<HoodieClusteringGroup> groupsDisabled = 
streamDisabled.collect(Collectors.toList());
+
+    Assertions.assertTrue(groupsEnabled.size() > 0);
+    Assertions.assertTrue(groupsDisabled.size() > 0);
+
+    int totalFilesEnabled = groupsEnabled.stream().mapToInt(g -> 
g.getSlices().size()).sum();
+    int totalFilesDisabled = groupsDisabled.stream().mapToInt(g -> 
g.getSlices().size()).sum();
+
+    Assertions.assertEquals(totalFilesEnabled, totalFilesDisabled);
+    Assertions.assertEquals(3, totalFilesEnabled);
+
+    
Assertions.assertTrue(groupsEnabled.get(0).getSlices().get(0).getDataFilePath().contains("001"));
+    
Assertions.assertTrue(groupsEnabled.get(1).getSlices().get(0).getDataFilePath().contains("002"));
+    
Assertions.assertTrue(groupsEnabled.get(2).getSlices().get(0).getDataFilePath().contains("003"));
+
+    
Assertions.assertTrue(groupsDisabled.get(0).getSlices().get(0).getDataFilePath().contains("002"));
+    
Assertions.assertTrue(groupsDisabled.get(1).getSlices().get(0).getDataFilePath().contains("003"));
+    
Assertions.assertTrue(groupsDisabled.get(2).getSlices().get(0).getDataFilePath().contains("001"));
+  }
+
   private FileSlice createFileSlice(long baseFileSize) {
+    return createFileSliceWithCommitTime(baseFileSize, "001");
+  }
+
+  private FileSlice createFileSliceWithCommitTime(long baseFileSize, String 
commitTime) {
     String fileId = FSUtils.createNewFileId(FSUtils.createNewFileIdPfx(), 0);
-    FileSlice fs = new FileSlice("p0", "001", fileId);
-    HoodieBaseFile f = new HoodieBaseFile(fileId);
+    FileSlice fs = new FileSlice("p0", commitTime, fileId);
+    String basePath = "/test/path/" + fileId + "_" + commitTime + ".parquet";
+    HoodieBaseFile f = new HoodieBaseFile(basePath);
     f.setFileSize(baseFileSize);
     fs.setBaseFile(f);
     return fs;

Reply via email to