This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 26cb7cdd2e Flink: Backport RewriteDataFiles add max file group count 
(#14861)
26cb7cdd2e is described below

commit 26cb7cdd2eff04b5cc7a88b899ee37e7d7f32f18
Author: GuoYu <[email protected]>
AuthorDate: Wed Dec 17 19:23:21 2025 +0800

    Flink: Backport RewriteDataFiles add max file group count (#14861)
    
    Backports #14837
---
 .../flink/maintenance/api/RewriteDataFiles.java      | 13 +++++++++++++
 .../flink/maintenance/operator/RewriteUtil.java      |  8 +++++++-
 .../operator/TestDataFileRewritePlanner.java         | 20 ++++++++++++++++++++
 .../flink/maintenance/api/RewriteDataFiles.java      | 13 +++++++++++++
 .../flink/maintenance/operator/RewriteUtil.java      |  8 +++++++-
 .../operator/TestDataFileRewritePlanner.java         | 20 ++++++++++++++++++++
 6 files changed, 80 insertions(+), 2 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index bedf70725a..3b64a79eee 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -181,6 +181,19 @@ public class RewriteDataFiles {
       return this;
     }
 
+    /**
+     * Configures the max file count for rewriting. See {@link
+     * SizeBasedFileRewritePlanner#MAX_FILE_GROUP_INPUT_FILES} for more 
details.
+     *
+     * @param maxFileGroupInputFiles file count for rewrite
+     */
+    public Builder maxFileGroupInputFiles(long maxFileGroupInputFiles) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES,
+          String.valueOf(maxFileGroupInputFiles));
+      return this;
+    }
+
     /**
      * Configures max files to rewrite. See {@link 
BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE}
      * for more details.
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
index 68aaf29ac0..95992ccd97 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
@@ -22,6 +22,7 @@ import static 
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_F
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
@@ -38,6 +39,11 @@ class RewriteUtil {
 
   static List<DataFileRewritePlanner.PlannedGroup> 
planDataFileRewrite(TableLoader tableLoader)
       throws Exception {
+    return planDataFileRewrite(tableLoader, ImmutableMap.of(MIN_INPUT_FILES, 
"2"));
+  }
+
+  static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(
+      TableLoader tableLoader, Map<String, String> rewriterOptions) throws 
Exception {
     try (OneInputStreamOperatorTestHarness<Trigger, 
DataFileRewritePlanner.PlannedGroup>
         testHarness =
             ProcessFunctionTestHarnesses.forProcessFunction(
@@ -48,7 +54,7 @@ class RewriteUtil {
                     tableLoader,
                     11,
                     10_000_000L,
-                    ImmutableMap.of(MIN_INPUT_FILES, "2"),
+                    rewriterOptions,
                     Expressions.alwaysTrue()))) {
       testHarness.open();
 
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index 9f4f96e106..cb1a41bb43 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.maintenance.operator;
 
+import static 
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES;
 import static 
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES;
 import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles;
 import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
@@ -182,6 +183,25 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
     }
   }
 
+  @Test
+  void testMaxFileGroupCount() throws Exception {
+    Table table = createPartitionedTable();
+    insertPartitioned(table, 1, "p1");
+    insertPartitioned(table, 2, "p1");
+    insertPartitioned(table, 3, "p2");
+    insertPartitioned(table, 4, "p2");
+    insertPartitioned(table, 5, "p2");
+    insertPartitioned(table, 6, "p2");
+
+    List<DataFileRewritePlanner.PlannedGroup> planWithNoLimit = 
planDataFileRewrite(tableLoader());
+    assertThat(planWithNoLimit).hasSize(2);
+
+    List<DataFileRewritePlanner.PlannedGroup> planWithMaxFileGroupCount =
+        planDataFileRewrite(
+            tableLoader(), ImmutableMap.of(MIN_INPUT_FILES, "2", 
MAX_FILE_GROUP_INPUT_FILES, "2"));
+    assertThat(planWithMaxFileGroupCount).hasSize(3);
+  }
+
   void assertRewriteFileGroup(
       DataFileRewritePlanner.PlannedGroup plannedGroup, Table table, 
Set<DataFile> files) {
     assertThat(plannedGroup.table().currentSnapshot().snapshotId())
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index bedf70725a..3b64a79eee 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -181,6 +181,19 @@ public class RewriteDataFiles {
       return this;
     }
 
+    /**
+     * Configures the max file count for rewriting. See {@link
+     * SizeBasedFileRewritePlanner#MAX_FILE_GROUP_INPUT_FILES} for more 
details.
+     *
+     * @param maxFileGroupInputFiles file count for rewrite
+     */
+    public Builder maxFileGroupInputFiles(long maxFileGroupInputFiles) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES,
+          String.valueOf(maxFileGroupInputFiles));
+      return this;
+    }
+
     /**
      * Configures max files to rewrite. See {@link 
BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE}
      * for more details.
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
index 68aaf29ac0..95992ccd97 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
@@ -22,6 +22,7 @@ import static 
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_F
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
@@ -38,6 +39,11 @@ class RewriteUtil {
 
   static List<DataFileRewritePlanner.PlannedGroup> 
planDataFileRewrite(TableLoader tableLoader)
       throws Exception {
+    return planDataFileRewrite(tableLoader, ImmutableMap.of(MIN_INPUT_FILES, 
"2"));
+  }
+
+  static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(
+      TableLoader tableLoader, Map<String, String> rewriterOptions) throws 
Exception {
     try (OneInputStreamOperatorTestHarness<Trigger, 
DataFileRewritePlanner.PlannedGroup>
         testHarness =
             ProcessFunctionTestHarnesses.forProcessFunction(
@@ -48,7 +54,7 @@ class RewriteUtil {
                     tableLoader,
                     11,
                     10_000_000L,
-                    ImmutableMap.of(MIN_INPUT_FILES, "2"),
+                    rewriterOptions,
                     Expressions.alwaysTrue()))) {
       testHarness.open();
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index 9f4f96e106..cb1a41bb43 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.maintenance.operator;
 
+import static 
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES;
 import static 
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES;
 import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles;
 import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
@@ -182,6 +183,25 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
     }
   }
 
+  @Test
+  void testMaxFileGroupCount() throws Exception {
+    Table table = createPartitionedTable();
+    insertPartitioned(table, 1, "p1");
+    insertPartitioned(table, 2, "p1");
+    insertPartitioned(table, 3, "p2");
+    insertPartitioned(table, 4, "p2");
+    insertPartitioned(table, 5, "p2");
+    insertPartitioned(table, 6, "p2");
+
+    List<DataFileRewritePlanner.PlannedGroup> planWithNoLimit = 
planDataFileRewrite(tableLoader());
+    assertThat(planWithNoLimit).hasSize(2);
+
+    List<DataFileRewritePlanner.PlannedGroup> planWithMaxFileGroupCount =
+        planDataFileRewrite(
+            tableLoader(), ImmutableMap.of(MIN_INPUT_FILES, "2", 
MAX_FILE_GROUP_INPUT_FILES, "2"));
+    assertThat(planWithMaxFileGroupCount).hasSize(3);
+  }
+
   void assertRewriteFileGroup(
       DataFileRewritePlanner.PlannedGroup plannedGroup, Table table, 
Set<DataFile> files) {
     assertThat(plannedGroup.table().currentSnapshot().snapshotId())

Reply via email to