This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 26cb7cdd2e Flink: Backport RewriteDataFiles add max file group count
(#14861)
26cb7cdd2e is described below
commit 26cb7cdd2eff04b5cc7a88b899ee37e7d7f32f18
Author: GuoYu <[email protected]>
AuthorDate: Wed Dec 17 19:23:21 2025 +0800
Flink: Backport RewriteDataFiles add max file group count (#14861)
Backports #14837
---
.../flink/maintenance/api/RewriteDataFiles.java | 13 +++++++++++++
.../flink/maintenance/operator/RewriteUtil.java | 8 +++++++-
.../operator/TestDataFileRewritePlanner.java | 20 ++++++++++++++++++++
.../flink/maintenance/api/RewriteDataFiles.java | 13 +++++++++++++
.../flink/maintenance/operator/RewriteUtil.java | 8 +++++++-
.../operator/TestDataFileRewritePlanner.java | 20 ++++++++++++++++++++
6 files changed, 80 insertions(+), 2 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index bedf70725a..3b64a79eee 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -181,6 +181,19 @@ public class RewriteDataFiles {
return this;
}
+ /**
+ * Configures the max file count for rewriting. See {@link
+ * SizeBasedFileRewritePlanner#MAX_FILE_GROUP_INPUT_FILES} for more
details.
+ *
+ * @param maxFileGroupInputFiles file count for rewrite
+ */
+ public Builder maxFileGroupInputFiles(long maxFileGroupInputFiles) {
+ this.rewriteOptions.put(
+ SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES,
+ String.valueOf(maxFileGroupInputFiles));
+ return this;
+ }
+
/**
* Configures max files to rewrite. See {@link
BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE}
* for more details.
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
index 68aaf29ac0..95992ccd97 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
@@ -22,6 +22,7 @@ import static
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_F
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
@@ -38,6 +39,11 @@ class RewriteUtil {
static List<DataFileRewritePlanner.PlannedGroup>
planDataFileRewrite(TableLoader tableLoader)
throws Exception {
+ return planDataFileRewrite(tableLoader, ImmutableMap.of(MIN_INPUT_FILES,
"2"));
+ }
+
+ static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(
+ TableLoader tableLoader, Map<String, String> rewriterOptions) throws
Exception {
try (OneInputStreamOperatorTestHarness<Trigger,
DataFileRewritePlanner.PlannedGroup>
testHarness =
ProcessFunctionTestHarnesses.forProcessFunction(
@@ -48,7 +54,7 @@ class RewriteUtil {
tableLoader,
11,
10_000_000L,
- ImmutableMap.of(MIN_INPUT_FILES, "2"),
+ rewriterOptions,
Expressions.alwaysTrue()))) {
testHarness.open();
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index 9f4f96e106..cb1a41bb43 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.maintenance.operator;
+import static
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES;
import static
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES;
import static
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles;
import static
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
@@ -182,6 +183,25 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
}
}
+ @Test
+ void testMaxFileGroupCount() throws Exception {
+ Table table = createPartitionedTable();
+ insertPartitioned(table, 1, "p1");
+ insertPartitioned(table, 2, "p1");
+ insertPartitioned(table, 3, "p2");
+ insertPartitioned(table, 4, "p2");
+ insertPartitioned(table, 5, "p2");
+ insertPartitioned(table, 6, "p2");
+
+ List<DataFileRewritePlanner.PlannedGroup> planWithNoLimit =
planDataFileRewrite(tableLoader());
+ assertThat(planWithNoLimit).hasSize(2);
+
+ List<DataFileRewritePlanner.PlannedGroup> planWithMaxFileGroupCount =
+ planDataFileRewrite(
+ tableLoader(), ImmutableMap.of(MIN_INPUT_FILES, "2",
MAX_FILE_GROUP_INPUT_FILES, "2"));
+ assertThat(planWithMaxFileGroupCount).hasSize(3);
+ }
+
void assertRewriteFileGroup(
DataFileRewritePlanner.PlannedGroup plannedGroup, Table table,
Set<DataFile> files) {
assertThat(plannedGroup.table().currentSnapshot().snapshotId())
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index bedf70725a..3b64a79eee 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -181,6 +181,19 @@ public class RewriteDataFiles {
return this;
}
+ /**
+ * Configures the max file count for rewriting. See {@link
+ * SizeBasedFileRewritePlanner#MAX_FILE_GROUP_INPUT_FILES} for more
details.
+ *
+ * @param maxFileGroupInputFiles file count for rewrite
+ */
+ public Builder maxFileGroupInputFiles(long maxFileGroupInputFiles) {
+ this.rewriteOptions.put(
+ SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES,
+ String.valueOf(maxFileGroupInputFiles));
+ return this;
+ }
+
/**
* Configures max files to rewrite. See {@link
BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE}
* for more details.
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
index 68aaf29ac0..95992ccd97 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
@@ -22,6 +22,7 @@ import static
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_F
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
@@ -38,6 +39,11 @@ class RewriteUtil {
static List<DataFileRewritePlanner.PlannedGroup>
planDataFileRewrite(TableLoader tableLoader)
throws Exception {
+ return planDataFileRewrite(tableLoader, ImmutableMap.of(MIN_INPUT_FILES,
"2"));
+ }
+
+ static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(
+ TableLoader tableLoader, Map<String, String> rewriterOptions) throws
Exception {
try (OneInputStreamOperatorTestHarness<Trigger,
DataFileRewritePlanner.PlannedGroup>
testHarness =
ProcessFunctionTestHarnesses.forProcessFunction(
@@ -48,7 +54,7 @@ class RewriteUtil {
tableLoader,
11,
10_000_000L,
- ImmutableMap.of(MIN_INPUT_FILES, "2"),
+ rewriterOptions,
Expressions.alwaysTrue()))) {
testHarness.open();
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index 9f4f96e106..cb1a41bb43 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.maintenance.operator;
+import static
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES;
import static
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES;
import static
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles;
import static
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
@@ -182,6 +183,25 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
}
}
+ @Test
+ void testMaxFileGroupCount() throws Exception {
+ Table table = createPartitionedTable();
+ insertPartitioned(table, 1, "p1");
+ insertPartitioned(table, 2, "p1");
+ insertPartitioned(table, 3, "p2");
+ insertPartitioned(table, 4, "p2");
+ insertPartitioned(table, 5, "p2");
+ insertPartitioned(table, 6, "p2");
+
+ List<DataFileRewritePlanner.PlannedGroup> planWithNoLimit =
planDataFileRewrite(tableLoader());
+ assertThat(planWithNoLimit).hasSize(2);
+
+ List<DataFileRewritePlanner.PlannedGroup> planWithMaxFileGroupCount =
+ planDataFileRewrite(
+ tableLoader(), ImmutableMap.of(MIN_INPUT_FILES, "2",
MAX_FILE_GROUP_INPUT_FILES, "2"));
+ assertThat(planWithMaxFileGroupCount).hasSize(3);
+ }
+
void assertRewriteFileGroup(
DataFileRewritePlanner.PlannedGroup plannedGroup, Table table,
Set<DataFile> files) {
assertThat(plannedGroup.table().currentSnapshot().snapshotId())