This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1f5b84131df [opt](iceberg) optimize rewrite_data_files to avoid
generating excessive small files (#60063)
1f5b84131df is described below
commit 1f5b84131dfb1758fc56abab00fca75a1723bf95
Author: Socrates <[email protected]>
AuthorDate: Mon Feb 2 17:53:08 2026 +0800
[opt](iceberg) optimize rewrite_data_files to avoid generating excessive
small files (#60063)
## What problem does this PR solve?
During Iceberg rewrite_data_files operations, when BE count is large, an
unexpected number of small files are generated.
Problem: total_files = task_count × active_BE_count × partition_count
## What is changed and how it works?
This change refines the parallelism strategy of Iceberg
rewrite_data_files to maximize concurrency without producing
extra output files.
- If expectedFileCount <= alive BE count, use GATHER. Read parallelism
is no longer forced to 1; it is capped at
min(defaultParallelism, expectedFileCount) so output files never exceed
the expected count.
- If expectedFileCount > alive BE count, cap per‑BE parallelism to
floor(expectedFileCount / aliveBeCount), and then take
min with defaultParallelism, ensuring total writers do not exceed the
expected file count.
- Updated unit and regression tests to cover GATHER vs non‑GATHER paths
and boundary cases.
## Benefits
- Small data: reduce from 100+ files to ~1 file (90%+ reduction)
- Adaptive strategy, no manual tuning needed
## Check List
- [x] Code changes
- [x] Test strategy described
---------
Co-authored-by: Claude Sonnet 4.5 <[email protected]>
---
.../iceberg/rewrite/RewriteDataFileExecutor.java | 20 +
.../iceberg/rewrite/RewriteGroupTask.java | 73 +++
.../org/apache/doris/nereids/StatementContext.java | 18 +
.../plans/physical/PhysicalIcebergTableSink.java | 10 +
.../iceberg/rewrite/RewriteGroupTaskTest.java | 524 +++++++++++++++++++++
.../action/test_iceberg_rewrite_data_files.groovy | 3 +-
...t_iceberg_rewrite_data_files_parallelism.groovy | 205 ++++++++
7 files changed, 852 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteDataFileExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteDataFileExecutor.java
index c900482710f..74afd0052b4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteDataFileExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteDataFileExecutor.java
@@ -22,8 +22,10 @@ import org.apache.doris.common.UserException;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergTransaction;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.computegroup.ComputeGroup;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
+import org.apache.doris.system.Backend;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
@@ -69,6 +71,11 @@ public class RewriteDataFileExecutor {
List<RewriteGroupTask> tasks = Lists.newArrayList();
RewriteResultCollector resultCollector = new
RewriteResultCollector(groups.size(), tasks);
+ // Get available BE count once before creating tasks
+ // This avoids calling getBackendsNumber() in each task during
multi-threaded execution.
+ // Use compute group from connect context to align with actual BE
selection for queries.
+ int availableBeCount = getAvailableBeCount();
+
// Create tasks with callbacks
for (RewriteDataGroup group : groups) {
RewriteGroupTask task = new RewriteGroupTask(
@@ -77,6 +84,7 @@ public class RewriteDataFileExecutor {
dorisTable,
connectContext,
targetFileSizeBytes,
+ availableBeCount,
new RewriteGroupTask.RewriteResultCallback() {
@Override
public void onTaskCompleted(Long taskId) {
@@ -153,6 +161,18 @@ public class RewriteDataFileExecutor {
}
}
+ private int getAvailableBeCount() throws UserException {
+ ComputeGroup computeGroup = connectContext.getComputeGroup();
+ List<Backend> backends = computeGroup.getBackendList();
+ int availableBeCount = 0;
+ for (Backend backend : backends) {
+ if (backend.isAlive()) {
+ availableBeCount++;
+ }
+ }
+ return availableBeCount;
+ }
+
/**
* Result collector for concurrent rewrite tasks
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
index 78277dff778..cd623528c76 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
@@ -64,6 +64,7 @@ public class RewriteGroupTask implements
TransientTaskExecutor {
private final Long taskId;
private final AtomicBoolean isCanceled;
private final AtomicBoolean isFinished;
+ private final int availableBeCount;
// for canceling the task
private StmtExecutor stmtExecutor;
@@ -73,12 +74,14 @@ public class RewriteGroupTask implements
TransientTaskExecutor {
IcebergExternalTable dorisTable,
ConnectContext connectContext,
long targetFileSizeBytes,
+ int availableBeCount,
RewriteResultCallback resultCallback) {
this.group = group;
this.transactionId = transactionId;
this.dorisTable = dorisTable;
this.connectContext = connectContext;
this.targetFileSizeBytes = targetFileSizeBytes;
+ this.availableBeCount = availableBeCount;
this.resultCallback = resultCallback;
this.taskId = UUID.randomUUID().getMostSignificantBits();
this.isCanceled = new AtomicBoolean(false);
@@ -234,6 +237,11 @@ public class RewriteGroupTask implements
TransientTaskExecutor {
// Clone session variables from parent
taskContext.setSessionVariable(VariableMgr.cloneSessionVariable(connectContext.getSessionVariable()));
+ // Calculate optimal parallelism and determine distribution strategy
+ RewriteStrategy strategy = calculateRewriteStrategy();
+ // Pipeline engine uses parallelPipelineTaskNum to control instance
parallelism.
+ taskContext.getSessionVariable().parallelPipelineTaskNum =
strategy.parallelism;
+
// Set env and basic identities
taskContext.setEnv(Env.getCurrentEnv());
taskContext.setDatabase(connectContext.getDatabase());
@@ -252,9 +260,74 @@ public class RewriteGroupTask implements
TransientTaskExecutor {
statementContext.setConnectContext(taskContext);
taskContext.setStatementContext(statementContext);
+ // Set GATHER distribution flag if needed (for small data rewrite)
+ statementContext.setUseGatherForIcebergRewrite(strategy.useGather);
+
return taskContext;
}
+ /**
+ * Calculate optimal rewrite strategy including parallelism and
distribution mode.
+ *
+ * The core idea is to precisely control the number of output files:
+ * 1. Calculate expected file count based on data size and target file size
+ * 2. If expected file count is less than available BE count, use GATHER
+ * to collect data to a single node, avoiding excessive writers
+ * 3. Otherwise, limit per-BE parallelism so total writers <= expected
files
+ *
+ * @return RewriteStrategy containing parallelism and distribution settings
+ */
+ private RewriteStrategy calculateRewriteStrategy() {
+ // 1. Calculate expected output file count based on data size
+ long totalSize = group.getTotalSize();
+ int expectedFileCount = (int) Math.ceil((double) totalSize /
targetFileSizeBytes);
+
+ // 2. Use available BE count passed from constructor
+ int availableBeCount = this.availableBeCount;
+ Preconditions.checkState(availableBeCount > 0,
+ "availableBeCount must be greater than 0 for rewrite task");
+
+ // 3. Get default parallelism from session variable (pipeline task num)
+ int defaultParallelism =
connectContext.getSessionVariable().getParallelExecInstanceNum();
+
+ // 4. Determine strategy based on expected file count
+ boolean useGather = false;
+ int optimalParallelism;
+
+ // When expected files < available BEs, collect all data to single node
+ if (expectedFileCount < availableBeCount) {
+ // Small data volume: use GATHER to write to single node
+ // Keep parallelism <= expected files to avoid extra output files
+ useGather = true;
+ optimalParallelism = Math.max(1, Math.min(defaultParallelism,
expectedFileCount));
+ } else {
+ // Larger data volume: limit per-BE parallelism so total writers
<= expected files
+ int maxParallelismByFileCount = Math.max(1, expectedFileCount /
availableBeCount);
+ optimalParallelism = Math.max(1, Math.min(defaultParallelism,
maxParallelismByFileCount));
+ }
+
+ LOG.info("[Rewrite Task] taskId: {}, totalSize: {} bytes,
targetFileSize: {} bytes, "
+ + "expectedFileCount: {}, availableBeCount: {},
defaultParallelism: {}, "
+ + "optimalParallelism: {}, useGather: {}",
+ taskId, totalSize, targetFileSizeBytes, expectedFileCount,
+ availableBeCount, defaultParallelism, optimalParallelism,
useGather);
+
+ return new RewriteStrategy(optimalParallelism, useGather);
+ }
+
+ /**
+ * Strategy for rewrite operation containing parallelism and distribution
settings.
+ */
+ private static class RewriteStrategy {
+ final int parallelism;
+ final boolean useGather;
+
+ RewriteStrategy(int parallelism, boolean useGather) {
+ this.parallelism = parallelism;
+ this.useGather = useGather;
+ }
+ }
+
/**
* Callback interface for task completion
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
index 3e1945965c9..92b7282a0ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
@@ -308,6 +308,9 @@ public class StatementContext implements Closeable {
// IcebergScanNode
// TODO: better solution?
private List<org.apache.iceberg.FileScanTask> icebergRewriteFileScanTasks
= null;
+ // For Iceberg rewrite operations: control whether to use GATHER
distribution
+ // When true, data will be collected to a single node to avoid generating
too many small files
+ private boolean useGatherForIcebergRewrite = false;
private boolean hasNestedColumns;
private final Set<CTEId> mustInlineCTE = new HashSet<>();
@@ -1154,6 +1157,21 @@ public class StatementContext implements Closeable {
return tasks;
}
+ /**
+ * Set whether to use GATHER distribution for Iceberg rewrite operations.
+ * When enabled, data will be collected to a single node to minimize
output files.
+ */
+ public void setUseGatherForIcebergRewrite(boolean useGather) {
+ this.useGatherForIcebergRewrite = useGather;
+ }
+
+ /**
+ * Check if GATHER distribution should be used for Iceberg rewrite
operations.
+ */
+ public boolean isUseGatherForIcebergRewrite() {
+ return this.useGatherForIcebergRewrite;
+ }
+
public boolean isSkipPrunePredicate() {
return skipPrunePredicate;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIcebergTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIcebergTableSink.java
index 95f91e7f013..12839e93493 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIcebergTableSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIcebergTableSink.java
@@ -29,6 +29,7 @@ import
org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.Statistics;
import java.util.ArrayList;
@@ -110,6 +111,15 @@ public class PhysicalIcebergTableSink<CHILD_TYPE extends
Plan> extends PhysicalB
*/
@Override
public PhysicalProperties getRequirePhysicalProperties() {
+ // For Iceberg rewrite operations with small data volume,
+ // use GATHER distribution to collect data to a single node
+ // This helps minimize the number of output files
+ ConnectContext connectContext = ConnectContext.get();
+ if (connectContext != null && connectContext.getStatementContext() !=
null
+ &&
connectContext.getStatementContext().isUseGatherForIcebergRewrite()) {
+ return PhysicalProperties.GATHER;
+ }
+
Set<String> partitionNames = targetTable.getPartitionNames();
if (!partitionNames.isEmpty()) {
List<Integer> columnIdx = new ArrayList<>();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTaskTest.java
new file mode 100644
index 00000000000..93e92655da6
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTaskTest.java
@@ -0,0 +1,524 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.rewrite;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.system.SystemInfoService;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collections;
+
+/**
+ * Unit tests for RewriteGroupTask, specifically testing
calculateRewriteStrategy logic
+ */
+public class RewriteGroupTaskTest {
+
+ @Mock
+ private RewriteDataGroup mockGroup;
+
+ @Mock
+ private IcebergExternalTable mockTable;
+
+ @Mock
+ private ConnectContext mockConnectContext;
+
+ private SessionVariable sessionVariable;
+
+ @Mock
+ private FileScanTask mockFileScanTask;
+
+ @Mock
+ private DataFile mockDataFile;
+
+ @Mock
+ private Env mockEnv;
+
+ @Mock
+ private SystemInfoService mockSystemInfoService;
+
+ private MockedStatic<Env> mockedStaticEnv;
+
+ private static final long MB = 1024 * 1024L;
+ private static final long GB = 1024 * 1024 * 1024L;
+
+ @BeforeEach
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+
+ // Setup common mocks
+ sessionVariable = new SessionVariable();
+ sessionVariable.parallelPipelineTaskNum = 8;
+ sessionVariable.maxInstanceNum = 64;
+
Mockito.when(mockConnectContext.getSessionVariable()).thenReturn(sessionVariable);
+
+ // Mock Env and SystemInfoService
+ mockedStaticEnv = Mockito.mockStatic(Env.class);
+ mockedStaticEnv.when(Env::getCurrentEnv).thenReturn(mockEnv);
+
mockedStaticEnv.when(Env::getCurrentSystemInfo).thenReturn(mockSystemInfoService);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ // Clean up static mock to avoid "already registered" errors
+ if (mockedStaticEnv != null) {
+ mockedStaticEnv.close();
+ mockedStaticEnv = null;
+ }
+ }
+
+ /**
+ * Test small data scenario - should use GATHER distribution
+ * Data: 500MB, Target file size: 512MB
+ * Expected: 1 file, useGather=true, parallelism=1
+ */
+ @Test
+ public void testCalculateRewriteStrategy_SmallData_UseGather() throws
Exception {
+ // Setup: 500MB data, 512MB target file size -> expectedFileCount = 1
+ long totalSize = 500 * MB;
+ long targetFileSizeBytes = 512 * MB;
+ int availableBeCount = 100;
+
+ Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+ // Create task and invoke private method via reflection
+ RewriteGroupTask task = new RewriteGroupTask(
+ mockGroup, 1L, mockTable, mockConnectContext,
+ targetFileSizeBytes, availableBeCount, null);
+
+ Object strategy = invokeCalculateRewriteStrategy(task);
+
+ // Verify strategy
+ int parallelism = (int) getFieldValue(strategy, "parallelism");
+ boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+ Assertions.assertEquals(1, parallelism, "Parallelism should be 1 for
small data");
+ Assertions.assertTrue(useGather, "Should use GATHER for small data
(expected files <= 1)");
+ }
+
+ /**
+ * Test very small data scenario - data smaller than target file size
+ * Data: 100MB, Target file size: 512MB
+ * Expected: 1 file, useGather=true, parallelism=1
+ */
+ @Test
+ public void testCalculateRewriteStrategy_VerySmallData_UseGather() throws
Exception {
+ long totalSize = 100 * MB;
+ long targetFileSizeBytes = 512 * MB;
+ int availableBeCount = 100;
+
+ Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+ RewriteGroupTask task = new RewriteGroupTask(
+ mockGroup, 1L, mockTable, mockConnectContext,
+ targetFileSizeBytes, availableBeCount, null);
+
+ Object strategy = invokeCalculateRewriteStrategy(task);
+
+ int parallelism = (int) getFieldValue(strategy, "parallelism");
+ boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+ Assertions.assertEquals(1, parallelism);
+ Assertions.assertTrue(useGather, "Should use GATHER for very small
data");
+ }
+
+ /**
+ * Test medium data scenario - should use GATHER when expected files <
available BEs
+ * Data: 5GB, Target file size: 512MB
+ * Expected: expectedFileCount=11, useGather=true, parallelism=min(8, 11)=8
+ */
+ @Test
+ public void
testCalculateRewriteStrategy_MediumData_UseGatherWhenExpectedFilesNotExceedBeCount()
+ throws Exception {
+ long totalSize = 5 * GB;
+ long targetFileSizeBytes = 512 * MB;
+ int availableBeCount = 100;
+
+ Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+ RewriteGroupTask task = new RewriteGroupTask(
+ mockGroup, 1L, mockTable, mockConnectContext,
+ targetFileSizeBytes, availableBeCount, null);
+
+ Object strategy = invokeCalculateRewriteStrategy(task);
+
+ int parallelism = (int) getFieldValue(strategy, "parallelism");
+ boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+ // expectedFileCount = ceil(5GB / 512MB) = ceil(10.24) = 11
+ // expectedFileCount < availableBeCount, so use GATHER
+ Assertions.assertEquals(8, parallelism, "Parallelism should be limited
by default parallelism");
+ Assertions.assertTrue(useGather, "Should use GATHER when expected
files < available BEs");
+ }
+
+ /**
+ * Test large data scenario - do NOT use GATHER when expected files ==
available BEs
+ * Data: 50GB, Target file size: 512MB
+ * Expected: expectedFileCount=100, useGather=false, parallelism=min(8,
floor(100/100)=1)=1
+ */
+ @Test
+ public void
testCalculateRewriteStrategy_LargeData_NoGatherWhenExpectedFilesEqualBeCount()
+ throws Exception {
+ long totalSize = 50 * GB;
+ long targetFileSizeBytes = 512 * MB;
+ int availableBeCount = 100;
+
+ Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+ RewriteGroupTask task = new RewriteGroupTask(
+ mockGroup, 1L, mockTable, mockConnectContext,
+ targetFileSizeBytes, availableBeCount, null);
+
+ Object strategy = invokeCalculateRewriteStrategy(task);
+
+ int parallelism = (int) getFieldValue(strategy, "parallelism");
+ boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+ // expectedFileCount = ceil(50GB / 512MB) = 100
+ // expectedFileCount == availableBeCount, so do not use GATHER
+ Assertions.assertEquals(1, parallelism, "Parallelism should be limited
by expected files / BE count");
+ Assertions.assertFalse(useGather, "Should NOT use GATHER when expected
files == available BEs");
+ }
+
+ /**
+ * Test boundary case: exactly at threshold (1 file expected)
+ * Data: 512MB, Target file size: 512MB
+ * Expected: 1 file, useGather=true, parallelism=1
+ */
+ @Test
+ public void testCalculateRewriteStrategy_BoundaryCase_ExactlyOneFile()
throws Exception {
+ long totalSize = 512 * MB;
+ long targetFileSizeBytes = 512 * MB;
+ int availableBeCount = 100;
+
+ Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+ RewriteGroupTask task = new RewriteGroupTask(
+ mockGroup, 1L, mockTable, mockConnectContext,
+ targetFileSizeBytes, availableBeCount, null);
+
+ Object strategy = invokeCalculateRewriteStrategy(task);
+
+ int parallelism = (int) getFieldValue(strategy, "parallelism");
+ boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+ // expectedFileCount = ceil(512MB / 512MB) = 1
+ Assertions.assertEquals(1, parallelism);
+ Assertions.assertTrue(useGather, "Should use GATHER when exactly 1
file expected");
+ }
+
+ /**
+ * Test boundary case: just over 1 file
+ * Data: 513MB, Target file size: 512MB
+ * Expected: expectedFileCount=2, useGather=true, parallelism=min(8, 2)=2
+ */
+ @Test
+ public void testCalculateRewriteStrategy_BoundaryCase_JustOverOneFile()
throws Exception {
+ long totalSize = 513 * MB;
+ long targetFileSizeBytes = 512 * MB;
+ int availableBeCount = 100;
+
+ Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+ RewriteGroupTask task = new RewriteGroupTask(
+ mockGroup, 1L, mockTable, mockConnectContext,
+ targetFileSizeBytes, availableBeCount, null);
+
+ Object strategy = invokeCalculateRewriteStrategy(task);
+
+ int parallelism = (int) getFieldValue(strategy, "parallelism");
+ boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+ // expectedFileCount = ceil(513MB / 512MB) = 2
+ // expectedFileCount < availableBeCount, so use GATHER
+ Assertions.assertEquals(2, parallelism);
+ Assertions.assertTrue(useGather, "Should use GATHER when expected
files < available BEs");
+ }
+
+ /**
+ * Test boundary case: expected files equal available BEs - should not use
GATHER
+ * Data: 512MB, Target file size: 64MB
+ * Expected: expectedFileCount=8, useGather=false, parallelism=min(8,
floor(8/8)=1)=1
+ */
+ @Test
+ public void
testCalculateRewriteStrategy_BoundaryCase_EqualToBeCount_NoGather() throws
Exception {
+ long totalSize = 512 * MB;
+ long targetFileSizeBytes = 64 * MB;
+ int availableBeCount = 8;
+
+ Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+ RewriteGroupTask task = new RewriteGroupTask(
+ mockGroup, 1L, mockTable, mockConnectContext,
+ targetFileSizeBytes, availableBeCount, null);
+
+ Object strategy = invokeCalculateRewriteStrategy(task);
+
+ int parallelism = (int) getFieldValue(strategy, "parallelism");
+ boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+ // expectedFileCount = ceil(512MB / 64MB) = 8
+ // expectedFileCount == availableBeCount, so do not use GATHER
+ Assertions.assertEquals(1, parallelism);
+ Assertions.assertFalse(useGather);
+ }
+
+ /**
+ * Test GATHER with high default parallelism - should cap at expected file
count
+ * Data: 513MB, Target file size: 512MB, Default parallelism: 100
+ * Expected: expectedFileCount=2, useGather=true, parallelism=min(100, 2)=2
+ */
+ @Test
+ public void testCalculateRewriteStrategy_GatherCapsAtExpectedFileCount()
throws Exception {
+ long totalSize = 513 * MB;
+ long targetFileSizeBytes = 512 * MB;
+ int availableBeCount = 100;
+
+ Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+ sessionVariable.parallelPipelineTaskNum = 100;
+
+ RewriteGroupTask task = new RewriteGroupTask(
+ mockGroup, 1L, mockTable, mockConnectContext,
+ targetFileSizeBytes, availableBeCount, null);
+
+ Object strategy = invokeCalculateRewriteStrategy(task);
+
+ int parallelism = (int) getFieldValue(strategy, "parallelism");
+ boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+ // expectedFileCount = ceil(513MB / 512MB) = 2
+ // expectedFileCount < availableBeCount, so use GATHER
+ Assertions.assertEquals(2, parallelism);
+ Assertions.assertTrue(useGather);
+ }
+
+ /**
+ * Test with limited BE count - parallelism limited by available BEs
+ * Data: 10GB, Target file size: 512MB, Available BEs: 5
+ * Expected: ~20 files, useGather=false, parallelism=min(8,
floor(21/5)=4)=4
+ */
+ @Test
+ public void testCalculateRewriteStrategy_LimitedByBeCount() throws
Exception {
+ long totalSize = 10 * GB;
+ long targetFileSizeBytes = 512 * MB;
+ int availableBeCount = 5; // Limited BE count
+
+ Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+ RewriteGroupTask task = new RewriteGroupTask(
+ mockGroup, 1L, mockTable, mockConnectContext,
+ targetFileSizeBytes, availableBeCount, null);
+
+ Object strategy = invokeCalculateRewriteStrategy(task);
+
+ int parallelism = (int) getFieldValue(strategy, "parallelism");
+ boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+ // expectedFileCount = ceil(10GB / 512MB) = ceil(20.48) = 21
+ // maxParallelismByFileCount = floor(21 / 5) = 4
+ // optimalParallelism = min(8, 4) = 4
+ Assertions.assertEquals(4, parallelism, "Parallelism should be limited
by expected files / BE count");
+ Assertions.assertFalse(useGather);
+ }
+
+ /**
+ * Test with high default parallelism - still use GATHER if expected files
< available BEs
+ * Data: 2GB, Target file size: 512MB, Default parallelism: 100
+ * Expected: expectedFileCount=4, useGather=true, parallelism=min(100, 4)=4
+ */
+ @Test
+ public void
testCalculateRewriteStrategy_UseGatherEvenWithHighDefaultParallelism() throws
Exception {
+ long totalSize = 2 * GB;
+ long targetFileSizeBytes = 512 * MB;
+ int availableBeCount = 100;
+
+ Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+ sessionVariable.parallelPipelineTaskNum = 100;
+
+ RewriteGroupTask task = new RewriteGroupTask(
+ mockGroup, 1L, mockTable, mockConnectContext,
+ targetFileSizeBytes, availableBeCount, null);
+
+ Object strategy = invokeCalculateRewriteStrategy(task);
+
+ int parallelism = (int) getFieldValue(strategy, "parallelism");
+ boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+ // expectedFileCount = ceil(2GB / 512MB) = ceil(4.0) = 4
+ // expectedFileCount < availableBeCount, so use GATHER
+ Assertions.assertEquals(4, parallelism, "Parallelism should be limited
by expected file count");
+ Assertions.assertTrue(useGather);
+ }
+
+ /**
+ * Test with very small target file size
+ * Data: 1GB, Target file size: 100MB
+ * Expected: 11 files, useGather=true, parallelism=min(11, 100, 8)=8
+ */
+ @Test
+ public void testCalculateRewriteStrategy_SmallTargetFileSize() throws
Exception {
+ long totalSize = 1 * GB;
+ long targetFileSizeBytes = 100 * MB; // Small target file size
+ int availableBeCount = 100;
+
+ Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+ RewriteGroupTask task = new RewriteGroupTask(
+ mockGroup, 1L, mockTable, mockConnectContext,
+ targetFileSizeBytes, availableBeCount, null);
+
+ Object strategy = invokeCalculateRewriteStrategy(task);
+
+ int parallelism = (int) getFieldValue(strategy, "parallelism");
+ boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+ // expectedFileCount = ceil(1GB / 100MB) = ceil(10.24) = 11
+ // expectedFileCount < availableBeCount, so use GATHER
+ Assertions.assertEquals(8, parallelism);
+ Assertions.assertTrue(useGather);
+ }
+
+ /**
+ * Test minimum parallelism guarantee
+ * Data: 0 bytes (edge case), Target file size: 512MB
+ * Expected: parallelism=1 (guaranteed minimum)
+ */
+ @Test
+ public void testCalculateRewriteStrategy_ZeroData_MinimumParallelism()
throws Exception {
+ long totalSize = 0L;
+ long targetFileSizeBytes = 512 * MB;
+ int availableBeCount = 100;
+
+ Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+ Mockito.when(mockGroup.getTasks()).thenReturn(Collections.emptyList());
+
+ RewriteGroupTask task = new RewriteGroupTask(
+ mockGroup, 1L, mockTable, mockConnectContext,
+ targetFileSizeBytes, availableBeCount, null);
+
+ Object strategy = invokeCalculateRewriteStrategy(task);
+
+ int parallelism = (int) getFieldValue(strategy, "parallelism");
+
+ // expectedFileCount = ceil(0 / 512MB) = 0
+ // optimalParallelism = max(1, min(0, 100, 8)) = max(1, 0) = 1
+ Assertions.assertEquals(1, parallelism, "Parallelism should be at
least 1");
+ }
+
+ /**
+ * Test invalid BE count - should throw when available BE count is zero
+ */
+ @Test
+ public void testCalculateRewriteStrategy_ZeroAvailableBe_Throws() throws
Exception {
+ long totalSize = 1 * GB;
+ long targetFileSizeBytes = 512 * MB;
+ int availableBeCount = 0;
+
+ Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+ RewriteGroupTask task = new RewriteGroupTask(
+ mockGroup, 1L, mockTable, mockConnectContext,
+ targetFileSizeBytes, availableBeCount, null);
+
+ java.lang.reflect.InvocationTargetException exception =
Assertions.assertThrows(
+ java.lang.reflect.InvocationTargetException.class,
+ () -> invokeCalculateRewriteStrategy(task));
+
+ Assertions.assertTrue(exception.getCause() instanceof
IllegalStateException);
+
Assertions.assertTrue(exception.getCause().getMessage().contains("availableBeCount"),
+ "Exception message should mention availableBeCount");
+ }
+
+ /**
+ * Test realistic scenario with multiple partitions
+ * Data: 3GB across multiple partitions, Target file size: 512MB
+ * Expected: ~6 files, useGather=true, parallelism=min(6, 100, 8)=6
+ */
+ @Test
+ public void testCalculateRewriteStrategy_MultiplePartitions() throws
Exception {
+ long totalSize = 3 * GB;
+ long targetFileSizeBytes = 512 * MB;
+ int availableBeCount = 100;
+
+ // Multiple tasks representing different partitions
+ Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+ Mockito.when(mockGroup.getTasks()).thenReturn(Arrays.asList(
+ mockFileScanTask, mockFileScanTask, mockFileScanTask));
+
+ RewriteGroupTask task = new RewriteGroupTask(
+ mockGroup, 1L, mockTable, mockConnectContext,
+ targetFileSizeBytes, availableBeCount, null);
+
+ Object strategy = invokeCalculateRewriteStrategy(task);
+
+ int parallelism = (int) getFieldValue(strategy, "parallelism");
+ boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+ // expectedFileCount = ceil(3GB / 512MB) = ceil(6.0) = 6
+ // expectedFileCount < availableBeCount, so use GATHER
+ Assertions.assertEquals(6, parallelism);
+ Assertions.assertTrue(useGather);
+ }
+
+ // ========== Helper Methods ==========
+
+ /**
+ * Invoke private method calculateRewriteStrategy using reflection
+ */
+ private Object invokeCalculateRewriteStrategy(RewriteGroupTask task)
throws Exception {
+ Method method =
RewriteGroupTask.class.getDeclaredMethod("calculateRewriteStrategy");
+ method.setAccessible(true);
+ return method.invoke(task);
+ }
+
+ /**
+ * Get field value from RewriteStrategy object using reflection
+ */
+ private Object getFieldValue(Object obj, String fieldName) throws
Exception {
+ Class<?> clazz = obj.getClass();
+ java.lang.reflect.Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(obj);
+ }
+}
diff --git
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files.groovy
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files.groovy
index bc808bfae21..3d43089bd2e 100644
---
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files.groovy
@@ -492,4 +492,5 @@ suite("test_iceberg_rewrite_data_files",
"p0,external,doris,external_docker,exte
assertTrue(northRecords[0][0] == 3, "NORTH region should still have 3
records")
logger.info("Specific partition rewrite test completed successfully")
-}
\ No newline at end of file
+
+}
diff --git
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files_parallelism.groovy
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files_parallelism.groovy
new file mode 100644
index 00000000000..696fc89371d
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files_parallelism.groovy
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_rewrite_data_files_parallelism",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ String catalog_name = "test_iceberg_rewrite_data_files_parallelism"
+ String db_name = "test_db_parallelism"
+ String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ sql """switch ${catalog_name}"""
+ sql """CREATE DATABASE IF NOT EXISTS ${db_name} """
+ sql """use ${db_name}"""
+
+ def getAliveBeCount = {
+ def backends = sql_return_maparray "SHOW BACKENDS"
+ def alive = backends.findAll { be ->
+ def aliveVal = be.Alive
+ if (aliveVal == null) {
+ aliveVal = be.IsAlive
+ }
+ if (aliveVal == null) {
+ aliveVal = be.alive
+ }
+ return aliveVal != null &&
aliveVal.toString().equalsIgnoreCase("true")
+ }.size()
+ if (alive == 0 && backends.size() > 0) {
+ logger.warn("SHOW BACKENDS does not report Alive, fallback to
total count")
+ alive = backends.size()
+ }
+ assertTrue(alive > 0, "No alive backend found for rewrite test")
+ return alive
+ }
+
+ def createAndInsertSmallBatches = { String tableName, int batches ->
+ sql """DROP TABLE IF EXISTS ${db_name}.${tableName}"""
+ sql """
+ CREATE TABLE ${db_name}.${tableName} (
+ id INT,
+ payload STRING
+ ) ENGINE=iceberg
+ """
+ (1..batches).each { batch ->
+ int base = (batch - 1) * 5
+ sql """
+ INSERT INTO ${db_name}.${tableName} VALUES
+ (${base + 1}, lpad('x', 1024, 'x')),
+ (${base + 2}, lpad('x', 1024, 'x')),
+ (${base + 3}, lpad('x', 1024, 'x')),
+ (${base + 4}, lpad('x', 1024, 'x')),
+ (${base + 5}, lpad('x', 1024, 'x'))
+ """
+ }
+ }
+
+ def getTotalSize = { String tableName ->
+ List<List<Object>> files = sql """SELECT file_size_in_bytes FROM
${tableName}\$files"""
+ assertTrue(files.size() > 0, "Expected at least 1 file before rewrite")
+ return files.inject(0L) { acc, row -> acc + (row[0] as long) }
+ }
+
+ def expectedUpperBound = { long expectedFileCount, int aliveBeCount, int
pipelineParallelism ->
+ boolean useGather = expectedFileCount <= aliveBeCount
+ if (useGather) {
+ return (int) Math.min(expectedFileCount, (long)
pipelineParallelism)
+ }
+ long perBeParallelism = Math.max(1L, (long)(expectedFileCount / (long)
aliveBeCount))
+ perBeParallelism = Math.min(perBeParallelism, (long)
pipelineParallelism)
+ return (int) Math.min(expectedFileCount, perBeParallelism *
aliveBeCount)
+ }
+
+ int aliveBeCount = getAliveBeCount()
+
+ //
=====================================================================================
+ // Case 1: expectedFileCount <= aliveBeCount (GATHER), upper bound is
expectedFileCount
+ //
=====================================================================================
+ logger.info("Starting gather boundary case (expectedFileCount <=
aliveBeCount)")
+ def table_name_gather = "test_rewrite_gather_boundary"
+ createAndInsertSmallBatches(table_name_gather, 10)
+ long totalSizeGather = getTotalSize(table_name_gather)
+ int pipelineParallelism = 16
+ sql """set parallel_pipeline_task_num=${pipelineParallelism}"""
+
+ // Calculate target file size to achieve gather case
+ long targetFileSizeGather = Math.max(1L, (long) Math.ceil(totalSizeGather
* 1.0 / aliveBeCount))
+ long expectedFileCountGather = (long) Math.ceil(totalSizeGather * 1.0 /
targetFileSizeGather)
+
+ // Due to file size granularity and metadata overhead, allow slight
deviation from ideal
+ // The test validates expectedUpperBound function behavior near the gather
threshold
+ assertTrue(expectedFileCountGather <= aliveBeCount + 1,
+ "Expected gather case (with tolerance):
expectedFileCount=${expectedFileCountGather}, aliveBeCount=${aliveBeCount}")
+
+ def rewriteGatherResult = sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name_gather}
+ EXECUTE rewrite_data_files(
+ "target-file-size-bytes" = "${targetFileSizeGather}",
+ "min-input-files" = "1",
+ "rewrite-all" = "true",
+ "max-file-group-size-bytes" = "1099511627776"
+ )
+ """
+ int addedFilesGather = rewriteGatherResult[0][1] as int
+ int expectedUpperGather = expectedUpperBound(expectedFileCountGather,
aliveBeCount, pipelineParallelism)
+ assertTrue(addedFilesGather > 0, "Expected added files > 0 in gather case")
+ assertTrue(addedFilesGather <= expectedUpperGather,
+ "addedFiles=${addedFilesGather}, expectedUpper=${expectedUpperGather},
"
+ + "expectedFileCount=${expectedFileCountGather}")
+
+ //
=====================================================================================
+ // Case 2: expectedFileCount > aliveBeCount (non-GATHER), limit by
expected files per BE
+ //
=====================================================================================
+ logger.info("Starting non-gather case (expectedFileCount > aliveBeCount)")
+ def table_name_nongather = "test_rewrite_non_gather"
+ createAndInsertSmallBatches(table_name_nongather, 10)
+ long totalSizeNonGather = getTotalSize(table_name_nongather)
+ pipelineParallelism = 64
+ sql """set parallel_pipeline_task_num=${pipelineParallelism}"""
+
+ long targetFileSizeNonGather = Math.max(1L, (long)
Math.floor(totalSizeNonGather / (aliveBeCount + 1)))
+ long expectedFileCountNonGather = (long) Math.ceil(totalSizeNonGather *
1.0 / targetFileSizeNonGather)
+ assertTrue(expectedFileCountNonGather > aliveBeCount,
+ "Expected non-gather case:
expectedFileCount=${expectedFileCountNonGather}, aliveBeCount=${aliveBeCount}")
+
+ def rewriteNonGatherResult = sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name_nongather}
+ EXECUTE rewrite_data_files(
+ "target-file-size-bytes" = "${targetFileSizeNonGather}",
+ "min-input-files" = "1",
+ "rewrite-all" = "true",
+ "max-file-group-size-bytes" = "1099511627776"
+ )
+ """
+ int addedFilesNonGather = rewriteNonGatherResult[0][1] as int
+ int expectedUpperNonGather =
expectedUpperBound(expectedFileCountNonGather, aliveBeCount,
pipelineParallelism)
+ assertTrue(addedFilesNonGather > 0, "Expected added files > 0 in
non-gather case")
+ assertTrue(addedFilesNonGather <= expectedUpperNonGather,
+ "addedFiles=${addedFilesNonGather},
expectedUpper=${expectedUpperNonGather}, "
+ + "expectedFileCount=${expectedFileCountNonGather}")
+
+ //
=====================================================================================
+ // Case 3: Moved from test_iceberg_rewrite_data_files, validates upper
bound dynamically
+ //
=====================================================================================
+ logger.info("Starting rewrite output file count cap test case (moved)")
+ def table_name_limit = "test_rewrite_file_count_cap"
+ createAndInsertSmallBatches(table_name_limit, 10)
+ long totalSizeLimit = getTotalSize(table_name_limit)
+ pipelineParallelism = 64
+ sql """set parallel_pipeline_task_num=${pipelineParallelism}"""
+
+ long targetFileSize = Math.max(1L, (long) Math.floor(totalSizeLimit /
(aliveBeCount + 1)))
+ long expectedFileCount = (long) Math.ceil(totalSizeLimit * 1.0 /
targetFileSize)
+ if (expectedFileCount <= aliveBeCount) {
+ targetFileSize = 1L
+ expectedFileCount = totalSizeLimit
+ }
+
+ def rewriteLimitResult = sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name_limit}
+ EXECUTE rewrite_data_files(
+ "target-file-size-bytes" = "${targetFileSize}",
+ "min-input-files" = "1",
+ "rewrite-all" = "true",
+ "max-file-group-size-bytes" = "1099511627776"
+ )
+ """
+ int addedFilesCountLimit = rewriteLimitResult[0][1] as int
+ int expectedUpper = expectedUpperBound(expectedFileCount, aliveBeCount,
pipelineParallelism)
+ assertTrue(addedFilesCountLimit > 0, "Expected added files count > 0 after
rewrite")
+ assertTrue(addedFilesCountLimit <= expectedUpper,
+ "addedFilesCount=${addedFilesCountLimit},
expectedUpper=${expectedUpper}, "
+ + "aliveBeCount=${aliveBeCount},
expectedFileCount=${expectedFileCount}")
+
+ logger.info("Rewrite data files parallelism tests completed successfully")
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]