This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new f2449550c7 Flink, Core: RewriteDataFiles add max file group count
(#14837)
f2449550c7 is described below
commit f2449550c72e8d744b43438ef332716121af2c6b
Author: GuoYu <[email protected]>
AuthorDate: Tue Dec 16 17:01:26 2025 +0800
Flink, Core: RewriteDataFiles add max file group count (#14837)
---
.../actions/SizeBasedFileRewritePlanner.java | 23 +++++++++-
.../java/org/apache/iceberg/util/BinPacking.java | 46 +++++++++++++++++---
.../org/apache/iceberg/util/TestBinPacking.java | 50 +++++++++++++++++++++-
docs/docs/flink-maintenance.md | 1 +
.../flink/maintenance/api/RewriteDataFiles.java | 13 ++++++
.../flink/maintenance/operator/RewriteUtil.java | 8 +++-
.../operator/TestDataFileRewritePlanner.java | 20 +++++++++
7 files changed, 150 insertions(+), 11 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewritePlanner.java
b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewritePlanner.java
index fad139078b..bcd0054130 100644
---
a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewritePlanner.java
+++
b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewritePlanner.java
@@ -107,6 +107,15 @@ public abstract class SizeBasedFileRewritePlanner<
public static final long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 100L * 1024 *
1024 * 1024; // 100 GB
+ /**
+ * This option controls the largest count of data that should be rewritten
in a single file group.
+ * It helps with breaking down the rewriting of very large partitions which
may not be rewritable
+ * otherwise due to the resource constraints of the cluster.
+ */
+ public static final String MAX_FILE_GROUP_INPUT_FILES =
"max-file-group-input-files";
+
+ public static final long MAX_FILE_GROUP_INPUT_FILES_DEFAULT = Long.MAX_VALUE;
+
private static final long SPLIT_OVERHEAD = 5L * 1024;
private final Table table;
@@ -116,6 +125,7 @@ public abstract class SizeBasedFileRewritePlanner<
private int minInputFiles;
private boolean rewriteAll;
private long maxGroupSize;
+ private long maxGroupCount;
private int outputSpecId;
protected SizeBasedFileRewritePlanner(Table table) {
@@ -151,6 +161,7 @@ public abstract class SizeBasedFileRewritePlanner<
this.minInputFiles = minInputFiles(options);
this.rewriteAll = rewriteAll(options);
this.maxGroupSize = maxGroupSize(options);
+ this.maxGroupCount = maxGroupCount(options);
this.outputSpecId = outputSpecId(options);
if (rewriteAll) {
@@ -168,7 +179,8 @@ public abstract class SizeBasedFileRewritePlanner<
protected Iterable<List<T>> planFileGroups(Iterable<T> tasks) {
Iterable<T> filteredTasks = rewriteAll ? tasks : filterFiles(tasks);
- BinPacking.ListPacker<T> packer = new
BinPacking.ListPacker<>(maxGroupSize, 1, false);
+ BinPacking.ListPacker<T> packer =
+ new BinPacking.ListPacker<>(maxGroupSize, 1, false, maxGroupCount);
List<List<T>> groups = packer.pack(filteredTasks, ContentScanTask::length);
return rewriteAll ? groups : filterFileGroups(groups);
}
@@ -337,6 +349,15 @@ public abstract class SizeBasedFileRewritePlanner<
return value;
}
+ private long maxGroupCount(Map<String, String> options) {
+ long value =
+ PropertyUtil.propertyAsLong(
+ options, MAX_FILE_GROUP_INPUT_FILES,
MAX_FILE_GROUP_INPUT_FILES_DEFAULT);
+ Preconditions.checkArgument(
+ value > 0, "'%s' is set to %s but must be > 0",
MAX_FILE_GROUP_INPUT_FILES, value);
+ return value;
+ }
+
private boolean rewriteAll(Map<String, String> options) {
return PropertyUtil.propertyAsBoolean(options, REWRITE_ALL,
REWRITE_ALL_DEFAULT);
}
diff --git a/core/src/main/java/org/apache/iceberg/util/BinPacking.java
b/core/src/main/java/org/apache/iceberg/util/BinPacking.java
index f3160389ca..db31b63974 100644
--- a/core/src/main/java/org/apache/iceberg/util/BinPacking.java
+++ b/core/src/main/java/org/apache/iceberg/util/BinPacking.java
@@ -36,11 +36,18 @@ public class BinPacking {
private final long targetWeight;
private final int lookback;
private final boolean largestBinFirst;
+ private final long maxItemsPerBin;
public ListPacker(long targetWeight, int lookback, boolean
largestBinFirst) {
+ this(targetWeight, lookback, largestBinFirst, Long.MAX_VALUE);
+ }
+
+ public ListPacker(
+ long targetWeight, int lookback, boolean largestBinFirst, long
maxItemsPerBin) {
this.targetWeight = targetWeight;
this.lookback = lookback;
this.largestBinFirst = largestBinFirst;
+ this.maxItemsPerBin = maxItemsPerBin;
}
public List<List<T>> packEnd(List<T> items, Function<T, Long> weightFunc) {
@@ -48,13 +55,19 @@ public class BinPacking {
ImmutableList.copyOf(
Iterables.transform(
new PackingIterable<>(
- Lists.reverse(items), targetWeight, lookback,
weightFunc, largestBinFirst),
+ Lists.reverse(items),
+ targetWeight,
+ lookback,
+ weightFunc,
+ largestBinFirst,
+ maxItemsPerBin),
Lists::reverse)));
}
public List<List<T>> pack(Iterable<T> items, Function<T, Long> weightFunc)
{
return ImmutableList.copyOf(
- new PackingIterable<>(items, targetWeight, lookback, weightFunc,
largestBinFirst));
+ new PackingIterable<>(
+ items, targetWeight, lookback, weightFunc, largestBinFirst,
maxItemsPerBin));
}
}
@@ -62,12 +75,13 @@ public class BinPacking {
private final Iterable<T> iterable;
private final long targetWeight;
private final int lookback;
+ private final long maxSize;
private final Function<T, Long> weightFunc;
private final boolean largestBinFirst;
public PackingIterable(
Iterable<T> iterable, long targetWeight, int lookback, Function<T,
Long> weightFunc) {
- this(iterable, targetWeight, lookback, weightFunc, false);
+ this(iterable, targetWeight, lookback, weightFunc, false,
Long.MAX_VALUE);
}
public PackingIterable(
@@ -76,11 +90,22 @@ public class BinPacking {
int lookback,
Function<T, Long> weightFunc,
boolean largestBinFirst) {
+ this(iterable, targetWeight, lookback, weightFunc, largestBinFirst,
Long.MAX_VALUE);
+ }
+
+ public PackingIterable(
+ Iterable<T> iterable,
+ long targetWeight,
+ int lookback,
+ Function<T, Long> weightFunc,
+ boolean largestBinFirst,
+ long maxSize) {
Preconditions.checkArgument(
lookback > 0, "Bin look-back size must be greater than 0: %s",
lookback);
this.iterable = iterable;
this.targetWeight = targetWeight;
this.lookback = lookback;
+ this.maxSize = maxSize;
this.weightFunc = weightFunc;
this.largestBinFirst = largestBinFirst;
}
@@ -88,7 +113,7 @@ public class BinPacking {
@Override
public Iterator<List<T>> iterator() {
return new PackingIterator<>(
- iterable.iterator(), targetWeight, lookback, weightFunc,
largestBinFirst);
+ iterable.iterator(), targetWeight, lookback, maxSize, weightFunc,
largestBinFirst);
}
}
@@ -97,6 +122,7 @@ public class BinPacking {
private final Iterator<T> items;
private final long targetWeight;
private final int lookback;
+ private final long maxSize;
private final Function<T, Long> weightFunc;
private final boolean largestBinFirst;
@@ -104,11 +130,13 @@ public class BinPacking {
Iterator<T> items,
long targetWeight,
int lookback,
+ long maxSize,
Function<T, Long> weightFunc,
boolean largestBinFirst) {
this.items = items;
this.targetWeight = targetWeight;
this.lookback = lookback;
+ this.maxSize = maxSize;
this.weightFunc = weightFunc;
this.largestBinFirst = largestBinFirst;
}
@@ -163,7 +191,7 @@ public class BinPacking {
}
private Bin<T> newBin() {
- return new Bin<>(targetWeight);
+ return new Bin<>(targetWeight, maxSize);
}
private static <T> Bin<T> removeLargestBin(Collection<Bin<T>> bins) {
@@ -181,11 +209,14 @@ public class BinPacking {
private static class Bin<T> {
private final long targetWeight;
+ private final long maxSize;
private final List<T> items = Lists.newArrayList();
private long binWeight = 0L;
+ private int binSize = 0;
- Bin(long targetWeight) {
+ Bin(long targetWeight, long maxSize) {
this.targetWeight = targetWeight;
+ this.maxSize = maxSize;
}
List<T> items() {
@@ -193,11 +224,12 @@ public class BinPacking {
}
boolean canAdd(long weight) {
- return binWeight + weight <= targetWeight;
+ return binWeight + weight <= targetWeight && binSize < maxSize;
}
void add(T item, long weight) {
this.binWeight += weight;
+ this.binSize++;
items.add(item);
}
diff --git a/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java
b/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java
index f0cc6db1ad..213f099c8a 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java
@@ -61,6 +61,37 @@ public class TestBinPacking {
.isEqualTo(list(list(1, 2, 3, 4, 5)));
}
+ @Test
+ public void testBasicBinPackingTargetSize() {
+ assertThat(pack(list(1, 2, 3, 4, 5), 3, Integer.MAX_VALUE, 2))
+ .as("Should pack the first 2 values")
+ .isEqualTo(list(list(1, 2), list(3), list(4), list(5)));
+
+ assertThat(pack(list(1, 2, 3, 4, 5), 5, Integer.MAX_VALUE, 2))
+ .as("Should pack the first 2 values")
+ .isEqualTo(list(list(1, 2), list(3), list(4), list(5)));
+
+ assertThat(pack(list(1, 2, 3, 4, 5), 6, Integer.MAX_VALUE, 2))
+ .as("Should pack the first 3 values")
+ .isEqualTo(list(list(1, 2), list(3), list(4), list(5)));
+
+ assertThat(pack(list(1, 2, 3, 4, 5), 8, Integer.MAX_VALUE, 3))
+ .as("Should pack the first 3 values")
+ .isEqualTo(list(list(1, 2, 3), list(4), list(5)));
+
+ assertThat(pack(list(1, 2, 3, 4, 5), 9, Integer.MAX_VALUE, 3))
+ .as("Should pack the first 3 values, last 2 values")
+ .isEqualTo(list(list(1, 2, 3), list(4, 5)));
+
+ assertThat(pack(list(1, 2, 3, 4, 5), 10, Integer.MAX_VALUE, 3))
+ .as("Should pack the first 3 values, last 2 values")
+ .isEqualTo(list(list(1, 2, 3), list(4, 5)));
+
+ assertThat(pack(list(1, 2, 3, 4, 5), 14, Integer.MAX_VALUE, 3))
+ .as("Should pack the first 3 values, last 2 values")
+ .isEqualTo(list(list(1, 2, 3), list(4, 5)));
+ }
+
@Test
public void testReverseBinPackingSingleLookback() {
assertThat(packEnd(list(1, 2, 3, 4, 5), 3, 1))
@@ -212,12 +243,27 @@ public class TestBinPacking {
}
private List<List<Integer>> pack(List<Integer> items, long targetWeight, int
lookback) {
- return pack(items, targetWeight, lookback, false);
+ return pack(items, targetWeight, lookback, Long.MAX_VALUE);
}
private List<List<Integer>> pack(
List<Integer> items, long targetWeight, int lookback, boolean
largestBinFirst) {
- ListPacker<Integer> packer = new ListPacker<>(targetWeight, lookback,
largestBinFirst);
+ return pack(items, targetWeight, lookback, largestBinFirst,
Long.MAX_VALUE);
+ }
+
+ private List<List<Integer>> pack(
+ List<Integer> items, long targetWeight, int lookback, long targetSize) {
+ return pack(items, targetWeight, lookback, false, targetSize);
+ }
+
+ private List<List<Integer>> pack(
+ List<Integer> items,
+ long targetWeight,
+ int lookback,
+ boolean largestBinFirst,
+ long targetSize) {
+ ListPacker<Integer> packer =
+ new ListPacker<>(targetWeight, lookback, largestBinFirst, targetSize);
return packer.pack(items, Integer::longValue);
}
diff --git a/docs/docs/flink-maintenance.md b/docs/docs/flink-maintenance.md
index a3d3ff1e4b..37fb9a7a84 100644
--- a/docs/docs/flink-maintenance.md
+++ b/docs/docs/flink-maintenance.md
@@ -218,6 +218,7 @@ env.execute("Table Maintenance Job");
| `partialProgressMaxCommits(int)` | Maximum commits allowed for partial
progress when partialProgressEnabled is true | 10 | int |
| `maxRewriteBytes(long)` | Maximum bytes to rewrite per execution |
Long.MAX_VALUE | long |
| `filter(Expression)` | Filter expression for selecting files to rewrite |
Expressions.alwaysTrue() | Expression |
+| `maxFileGroupInputFiles(long)` | Maximum allowed number of input
files within a file group |
Long.MAX_VALUE | long |
#### DeleteOrphanFiles Configuration
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index bedf70725a..3b64a79eee 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -181,6 +181,19 @@ public class RewriteDataFiles {
return this;
}
+ /**
+ * Configures the max file count for rewriting. See {@link
+ * SizeBasedFileRewritePlanner#MAX_FILE_GROUP_INPUT_FILES} for more
details.
+ *
+ * @param maxFileGroupInputFiles file count for rewrite
+ */
+ public Builder maxFileGroupInputFiles(long maxFileGroupInputFiles) {
+ this.rewriteOptions.put(
+ SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES,
+ String.valueOf(maxFileGroupInputFiles));
+ return this;
+ }
+
/**
* Configures max files to rewrite. See {@link
BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE}
* for more details.
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
index 68aaf29ac0..95992ccd97 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
@@ -22,6 +22,7 @@ import static
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_F
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
@@ -38,6 +39,11 @@ class RewriteUtil {
static List<DataFileRewritePlanner.PlannedGroup>
planDataFileRewrite(TableLoader tableLoader)
throws Exception {
+ return planDataFileRewrite(tableLoader, ImmutableMap.of(MIN_INPUT_FILES,
"2"));
+ }
+
+ static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(
+ TableLoader tableLoader, Map<String, String> rewriterOptions) throws
Exception {
try (OneInputStreamOperatorTestHarness<Trigger,
DataFileRewritePlanner.PlannedGroup>
testHarness =
ProcessFunctionTestHarnesses.forProcessFunction(
@@ -48,7 +54,7 @@ class RewriteUtil {
tableLoader,
11,
10_000_000L,
- ImmutableMap.of(MIN_INPUT_FILES, "2"),
+ rewriterOptions,
Expressions.alwaysTrue()))) {
testHarness.open();
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index 9f4f96e106..cb1a41bb43 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.maintenance.operator;
+import static
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES;
import static
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES;
import static
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles;
import static
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
@@ -182,6 +183,25 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
}
}
+ @Test
+ void testMaxFileGroupCount() throws Exception {
+ Table table = createPartitionedTable();
+ insertPartitioned(table, 1, "p1");
+ insertPartitioned(table, 2, "p1");
+ insertPartitioned(table, 3, "p2");
+ insertPartitioned(table, 4, "p2");
+ insertPartitioned(table, 5, "p2");
+ insertPartitioned(table, 6, "p2");
+
+ List<DataFileRewritePlanner.PlannedGroup> planWithNoLimit =
planDataFileRewrite(tableLoader());
+ assertThat(planWithNoLimit).hasSize(2);
+
+ List<DataFileRewritePlanner.PlannedGroup> planWithMaxFileGroupCount =
+ planDataFileRewrite(
+ tableLoader(), ImmutableMap.of(MIN_INPUT_FILES, "2",
MAX_FILE_GROUP_INPUT_FILES, "2"));
+ assertThat(planWithMaxFileGroupCount).hasSize(3);
+ }
+
void assertRewriteFileGroup(
DataFileRewritePlanner.PlannedGroup plannedGroup, Table table,
Set<DataFile> files) {
assertThat(plannedGroup.table().currentSnapshot().snapshotId())