This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f5b84131df [opt](iceberg) optimize rewrite_data_files to avoid 
generating excessive small files (#60063)
1f5b84131df is described below

commit 1f5b84131dfb1758fc56abab00fca75a1723bf95
Author: Socrates <[email protected]>
AuthorDate: Mon Feb 2 17:53:08 2026 +0800

    [opt](iceberg) optimize rewrite_data_files to avoid generating excessive 
small files (#60063)
    
    ## What problem does this PR solve?
    
    During Iceberg rewrite_data_files operations, when BE count is large, an
    unexpected number of small files are generated.
    
    Problem: total_files = task_count × active_BE_count × partition_count
    
    ## What is changed and how it works?
    
    This change refines the parallelism strategy of Iceberg
    rewrite_data_files to maximize concurrency without producing
      extra output files.
    
    - If expectedFileCount <= alive BE count, use GATHER. Read parallelism
    is no longer forced to 1; it is capped at
    min(defaultParallelism, expectedFileCount) so output files never exceed
    the expected count.
    - If expectedFileCount > alive BE count, cap per‑BE parallelism to
    floor(expectedFileCount / aliveBeCount), and then take
    min with defaultParallelism, ensuring total writers do not exceed the
    expected file count.
    - Updated unit and regression tests to cover GATHER vs non‑GATHER paths
    and boundary cases.
    ## Benefits
    
    - Small data: reduce from 100+ files to ~1 file (90%+ reduction)
    - Adaptive strategy, no manual tuning needed
    
    ## Check List
    
    - [x] Code changes
    - [x] Test strategy described
    
    ---------
    
    Co-authored-by: Claude Sonnet 4.5 <[email protected]>
---
 .../iceberg/rewrite/RewriteDataFileExecutor.java   |  20 +
 .../iceberg/rewrite/RewriteGroupTask.java          |  73 +++
 .../org/apache/doris/nereids/StatementContext.java |  18 +
 .../plans/physical/PhysicalIcebergTableSink.java   |  10 +
 .../iceberg/rewrite/RewriteGroupTaskTest.java      | 524 +++++++++++++++++++++
 .../action/test_iceberg_rewrite_data_files.groovy  |   3 +-
 ...t_iceberg_rewrite_data_files_parallelism.groovy | 205 ++++++++
 7 files changed, 852 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteDataFileExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteDataFileExecutor.java
index c900482710f..74afd0052b4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteDataFileExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteDataFileExecutor.java
@@ -22,8 +22,10 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergTransaction;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.computegroup.ComputeGroup;
 import org.apache.doris.scheduler.exception.JobException;
 import org.apache.doris.scheduler.executor.TransientTaskExecutor;
+import org.apache.doris.system.Backend;
 
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
@@ -69,6 +71,11 @@ public class RewriteDataFileExecutor {
         List<RewriteGroupTask> tasks = Lists.newArrayList();
         RewriteResultCollector resultCollector = new 
RewriteResultCollector(groups.size(), tasks);
 
+        // Get available BE count once before creating tasks
+        // This avoids calling getBackendsNumber() in each task during 
multi-threaded execution.
+        // Use compute group from connect context to align with actual BE 
selection for queries.
+        int availableBeCount = getAvailableBeCount();
+
         // Create tasks with callbacks
         for (RewriteDataGroup group : groups) {
             RewriteGroupTask task = new RewriteGroupTask(
@@ -77,6 +84,7 @@ public class RewriteDataFileExecutor {
                     dorisTable,
                     connectContext,
                     targetFileSizeBytes,
+                    availableBeCount,
                     new RewriteGroupTask.RewriteResultCallback() {
                         @Override
                         public void onTaskCompleted(Long taskId) {
@@ -153,6 +161,18 @@ public class RewriteDataFileExecutor {
         }
     }
 
+    private int getAvailableBeCount() throws UserException {
+        ComputeGroup computeGroup = connectContext.getComputeGroup();
+        List<Backend> backends = computeGroup.getBackendList();
+        int availableBeCount = 0;
+        for (Backend backend : backends) {
+            if (backend.isAlive()) {
+                availableBeCount++;
+            }
+        }
+        return availableBeCount;
+    }
+
     /**
      * Result collector for concurrent rewrite tasks
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
index 78277dff778..cd623528c76 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
@@ -64,6 +64,7 @@ public class RewriteGroupTask implements 
TransientTaskExecutor {
     private final Long taskId;
     private final AtomicBoolean isCanceled;
     private final AtomicBoolean isFinished;
+    private final int availableBeCount;
 
     // for canceling the task
     private StmtExecutor stmtExecutor;
@@ -73,12 +74,14 @@ public class RewriteGroupTask implements 
TransientTaskExecutor {
             IcebergExternalTable dorisTable,
             ConnectContext connectContext,
             long targetFileSizeBytes,
+            int availableBeCount,
             RewriteResultCallback resultCallback) {
         this.group = group;
         this.transactionId = transactionId;
         this.dorisTable = dorisTable;
         this.connectContext = connectContext;
         this.targetFileSizeBytes = targetFileSizeBytes;
+        this.availableBeCount = availableBeCount;
         this.resultCallback = resultCallback;
         this.taskId = UUID.randomUUID().getMostSignificantBits();
         this.isCanceled = new AtomicBoolean(false);
@@ -234,6 +237,11 @@ public class RewriteGroupTask implements 
TransientTaskExecutor {
         // Clone session variables from parent
         
taskContext.setSessionVariable(VariableMgr.cloneSessionVariable(connectContext.getSessionVariable()));
 
+        // Calculate optimal parallelism and determine distribution strategy
+        RewriteStrategy strategy = calculateRewriteStrategy();
+        // Pipeline engine uses parallelPipelineTaskNum to control instance 
parallelism.
+        taskContext.getSessionVariable().parallelPipelineTaskNum = 
strategy.parallelism;
+
         // Set env and basic identities
         taskContext.setEnv(Env.getCurrentEnv());
         taskContext.setDatabase(connectContext.getDatabase());
@@ -252,9 +260,74 @@ public class RewriteGroupTask implements 
TransientTaskExecutor {
         statementContext.setConnectContext(taskContext);
         taskContext.setStatementContext(statementContext);
 
+        // Set GATHER distribution flag if needed (for small data rewrite)
+        statementContext.setUseGatherForIcebergRewrite(strategy.useGather);
+
         return taskContext;
     }
 
+    /**
+     * Calculate optimal rewrite strategy including parallelism and 
distribution mode.
+     *
+     * The core idea is to precisely control the number of output files:
+     * 1. Calculate expected file count based on data size and target file size
+     * 2. If expected file count is less than available BE count, use GATHER
+     * to collect data to a single node, avoiding excessive writers
+     * 3. Otherwise, limit per-BE parallelism so total writers <= expected 
files
+     *
+     * @return RewriteStrategy containing parallelism and distribution settings
+     */
+    private RewriteStrategy calculateRewriteStrategy() {
+        // 1. Calculate expected output file count based on data size
+        long totalSize = group.getTotalSize();
+        int expectedFileCount = (int) Math.ceil((double) totalSize / 
targetFileSizeBytes);
+
+        // 2. Use available BE count passed from constructor
+        int availableBeCount = this.availableBeCount;
+        Preconditions.checkState(availableBeCount > 0,
+                "availableBeCount must be greater than 0 for rewrite task");
+
+        // 3. Get default parallelism from session variable (pipeline task num)
+        int defaultParallelism = 
connectContext.getSessionVariable().getParallelExecInstanceNum();
+
+        // 4. Determine strategy based on expected file count
+        boolean useGather = false;
+        int optimalParallelism;
+
+        // When expected files < available BEs, collect all data to single node
+        if (expectedFileCount < availableBeCount) {
+            // Small data volume: use GATHER to write to single node
+            // Keep parallelism <= expected files to avoid extra output files
+            useGather = true;
+            optimalParallelism = Math.max(1, Math.min(defaultParallelism, 
expectedFileCount));
+        } else {
+            // Larger data volume: limit per-BE parallelism so total writers 
<= expected files
+            int maxParallelismByFileCount = Math.max(1, expectedFileCount / 
availableBeCount);
+            optimalParallelism = Math.max(1, Math.min(defaultParallelism, 
maxParallelismByFileCount));
+        }
+
+        LOG.info("[Rewrite Task] taskId: {}, totalSize: {} bytes, 
targetFileSize: {} bytes, "
+                        + "expectedFileCount: {}, availableBeCount: {}, 
defaultParallelism: {}, "
+                        + "optimalParallelism: {}, useGather: {}",
+                taskId, totalSize, targetFileSizeBytes, expectedFileCount,
+                availableBeCount, defaultParallelism, optimalParallelism, 
useGather);
+
+        return new RewriteStrategy(optimalParallelism, useGather);
+    }
+
+    /**
+     * Strategy for rewrite operation containing parallelism and distribution 
settings.
+     */
+    private static class RewriteStrategy {
+        final int parallelism;
+        final boolean useGather;
+
+        RewriteStrategy(int parallelism, boolean useGather) {
+            this.parallelism = parallelism;
+            this.useGather = useGather;
+        }
+    }
+
     /**
      * Callback interface for task completion
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
index 3e1945965c9..92b7282a0ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
@@ -308,6 +308,9 @@ public class StatementContext implements Closeable {
     // IcebergScanNode
     // TODO: better solution?
     private List<org.apache.iceberg.FileScanTask> icebergRewriteFileScanTasks 
= null;
+    // For Iceberg rewrite operations: control whether to use GATHER 
distribution
+    // When true, data will be collected to a single node to avoid generating 
too many small files
+    private boolean useGatherForIcebergRewrite = false;
     private boolean hasNestedColumns;
 
     private final Set<CTEId> mustInlineCTE = new HashSet<>();
@@ -1154,6 +1157,21 @@ public class StatementContext implements Closeable {
         return tasks;
     }
 
+    /**
+     * Set whether to use GATHER distribution for Iceberg rewrite operations.
+     * When enabled, data will be collected to a single node to minimize 
output files.
+     */
+    public void setUseGatherForIcebergRewrite(boolean useGather) {
+        this.useGatherForIcebergRewrite = useGather;
+    }
+
+    /**
+     * Check if GATHER distribution should be used for Iceberg rewrite 
operations.
+     */
+    public boolean isUseGatherForIcebergRewrite() {
+        return this.useGatherForIcebergRewrite;
+    }
+
     public boolean isSkipPrunePredicate() {
         return skipPrunePredicate;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIcebergTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIcebergTableSink.java
index 95f91e7f013..12839e93493 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIcebergTableSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIcebergTableSink.java
@@ -29,6 +29,7 @@ import 
org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.Statistics;
 
 import java.util.ArrayList;
@@ -110,6 +111,15 @@ public class PhysicalIcebergTableSink<CHILD_TYPE extends 
Plan> extends PhysicalB
      */
     @Override
     public PhysicalProperties getRequirePhysicalProperties() {
+        // For Iceberg rewrite operations with small data volume,
+        // use GATHER distribution to collect data to a single node
+        // This helps minimize the number of output files
+        ConnectContext connectContext = ConnectContext.get();
+        if (connectContext != null && connectContext.getStatementContext() != 
null
+                && 
connectContext.getStatementContext().isUseGatherForIcebergRewrite()) {
+            return PhysicalProperties.GATHER;
+        }
+
         Set<String> partitionNames = targetTable.getPartitionNames();
         if (!partitionNames.isEmpty()) {
             List<Integer> columnIdx = new ArrayList<>();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTaskTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTaskTest.java
new file mode 100644
index 00000000000..93e92655da6
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTaskTest.java
@@ -0,0 +1,524 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.rewrite;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.system.SystemInfoService;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collections;
+
+/**
+ * Unit tests for RewriteGroupTask, specifically testing 
calculateRewriteStrategy logic
+ */
+public class RewriteGroupTaskTest {
+
+    @Mock
+    private RewriteDataGroup mockGroup;
+
+    @Mock
+    private IcebergExternalTable mockTable;
+
+    @Mock
+    private ConnectContext mockConnectContext;
+
+    private SessionVariable sessionVariable;
+
+    @Mock
+    private FileScanTask mockFileScanTask;
+
+    @Mock
+    private DataFile mockDataFile;
+
+    @Mock
+    private Env mockEnv;
+
+    @Mock
+    private SystemInfoService mockSystemInfoService;
+
+    private MockedStatic<Env> mockedStaticEnv;
+
+    private static final long MB = 1024 * 1024L;
+    private static final long GB = 1024 * 1024 * 1024L;
+
+    @BeforeEach
+    public void setUp() {
+        MockitoAnnotations.openMocks(this);
+
+        // Setup common mocks
+        sessionVariable = new SessionVariable();
+        sessionVariable.parallelPipelineTaskNum = 8;
+        sessionVariable.maxInstanceNum = 64;
+        
Mockito.when(mockConnectContext.getSessionVariable()).thenReturn(sessionVariable);
+
+        // Mock Env and SystemInfoService
+        mockedStaticEnv = Mockito.mockStatic(Env.class);
+        mockedStaticEnv.when(Env::getCurrentEnv).thenReturn(mockEnv);
+        
mockedStaticEnv.when(Env::getCurrentSystemInfo).thenReturn(mockSystemInfoService);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        // Clean up static mock to avoid "already registered" errors
+        if (mockedStaticEnv != null) {
+            mockedStaticEnv.close();
+            mockedStaticEnv = null;
+        }
+    }
+
+    /**
+     * Test small data scenario - should use GATHER distribution
+     * Data: 500MB, Target file size: 512MB
+     * Expected: 1 file, useGather=true, parallelism=1
+     */
+    @Test
+    public void testCalculateRewriteStrategy_SmallData_UseGather() throws 
Exception {
+        // Setup: 500MB data, 512MB target file size -> expectedFileCount = 1
+        long totalSize = 500 * MB;
+        long targetFileSizeBytes = 512 * MB;
+        int availableBeCount = 100;
+
+        Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+        
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+        // Create task and invoke private method via reflection
+        RewriteGroupTask task = new RewriteGroupTask(
+                mockGroup, 1L, mockTable, mockConnectContext,
+                targetFileSizeBytes, availableBeCount, null);
+
+        Object strategy = invokeCalculateRewriteStrategy(task);
+
+        // Verify strategy
+        int parallelism = (int) getFieldValue(strategy, "parallelism");
+        boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+        Assertions.assertEquals(1, parallelism, "Parallelism should be 1 for 
small data");
+        Assertions.assertTrue(useGather, "Should use GATHER for small data 
(expected files <= 1)");
+    }
+
+    /**
+     * Test very small data scenario - data smaller than target file size
+     * Data: 100MB, Target file size: 512MB
+     * Expected: 1 file, useGather=true, parallelism=1
+     */
+    @Test
+    public void testCalculateRewriteStrategy_VerySmallData_UseGather() throws 
Exception {
+        long totalSize = 100 * MB;
+        long targetFileSizeBytes = 512 * MB;
+        int availableBeCount = 100;
+
+        Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+        
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+        RewriteGroupTask task = new RewriteGroupTask(
+                mockGroup, 1L, mockTable, mockConnectContext,
+                targetFileSizeBytes, availableBeCount, null);
+
+        Object strategy = invokeCalculateRewriteStrategy(task);
+
+        int parallelism = (int) getFieldValue(strategy, "parallelism");
+        boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+        Assertions.assertEquals(1, parallelism);
+        Assertions.assertTrue(useGather, "Should use GATHER for very small 
data");
+    }
+
+    /**
+     * Test medium data scenario - should use GATHER when expected files < 
available BEs
+     * Data: 5GB, Target file size: 512MB
+     * Expected: expectedFileCount=11, useGather=true, parallelism=min(8, 11)=8
+     */
+    @Test
+    public void 
testCalculateRewriteStrategy_MediumData_UseGatherWhenExpectedFilesNotExceedBeCount()
+            throws Exception {
+        long totalSize = 5 * GB;
+        long targetFileSizeBytes = 512 * MB;
+        int availableBeCount = 100;
+
+        Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+        
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+        RewriteGroupTask task = new RewriteGroupTask(
+                mockGroup, 1L, mockTable, mockConnectContext,
+                targetFileSizeBytes, availableBeCount, null);
+
+        Object strategy = invokeCalculateRewriteStrategy(task);
+
+        int parallelism = (int) getFieldValue(strategy, "parallelism");
+        boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+        // expectedFileCount = ceil(5GB / 512MB) = ceil(10.24) = 11
+        // expectedFileCount < availableBeCount, so use GATHER
+        Assertions.assertEquals(8, parallelism, "Parallelism should be limited 
by default parallelism");
+        Assertions.assertTrue(useGather, "Should use GATHER when expected 
files < available BEs");
+    }
+
+    /**
+     * Test large data scenario - do NOT use GATHER when expected files == 
available BEs
+     * Data: 50GB, Target file size: 512MB
+     * Expected: expectedFileCount=100, useGather=false, parallelism=min(8, 
floor(100/100)=1)=1
+     */
+    @Test
+    public void 
testCalculateRewriteStrategy_LargeData_NoGatherWhenExpectedFilesEqualBeCount()
+            throws Exception {
+        long totalSize = 50 * GB;
+        long targetFileSizeBytes = 512 * MB;
+        int availableBeCount = 100;
+
+        Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+        
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+        RewriteGroupTask task = new RewriteGroupTask(
+                mockGroup, 1L, mockTable, mockConnectContext,
+                targetFileSizeBytes, availableBeCount, null);
+
+        Object strategy = invokeCalculateRewriteStrategy(task);
+
+        int parallelism = (int) getFieldValue(strategy, "parallelism");
+        boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+        // expectedFileCount = ceil(50GB / 512MB) = 100
+        // expectedFileCount == availableBeCount, so do not use GATHER
+        Assertions.assertEquals(1, parallelism, "Parallelism should be limited 
by expected files / BE count");
+        Assertions.assertFalse(useGather, "Should NOT use GATHER when expected 
files == available BEs");
+    }
+
+    /**
+     * Test boundary case: exactly at threshold (1 file expected)
+     * Data: 512MB, Target file size: 512MB
+     * Expected: 1 file, useGather=true, parallelism=1
+     */
+    @Test
+    public void testCalculateRewriteStrategy_BoundaryCase_ExactlyOneFile() 
throws Exception {
+        long totalSize = 512 * MB;
+        long targetFileSizeBytes = 512 * MB;
+        int availableBeCount = 100;
+
+        Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+        
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+        RewriteGroupTask task = new RewriteGroupTask(
+                mockGroup, 1L, mockTable, mockConnectContext,
+                targetFileSizeBytes, availableBeCount, null);
+
+        Object strategy = invokeCalculateRewriteStrategy(task);
+
+        int parallelism = (int) getFieldValue(strategy, "parallelism");
+        boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+        // expectedFileCount = ceil(512MB / 512MB) = 1
+        Assertions.assertEquals(1, parallelism);
+        Assertions.assertTrue(useGather, "Should use GATHER when exactly 1 
file expected");
+    }
+
+    /**
+     * Test boundary case: just over 1 file
+     * Data: 513MB, Target file size: 512MB
+     * Expected: expectedFileCount=2, useGather=true, parallelism=min(8, 2)=2
+     */
+    @Test
+    public void testCalculateRewriteStrategy_BoundaryCase_JustOverOneFile() 
throws Exception {
+        long totalSize = 513 * MB;
+        long targetFileSizeBytes = 512 * MB;
+        int availableBeCount = 100;
+
+        Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+        
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+        RewriteGroupTask task = new RewriteGroupTask(
+                mockGroup, 1L, mockTable, mockConnectContext,
+                targetFileSizeBytes, availableBeCount, null);
+
+        Object strategy = invokeCalculateRewriteStrategy(task);
+
+        int parallelism = (int) getFieldValue(strategy, "parallelism");
+        boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+        // expectedFileCount = ceil(513MB / 512MB) = 2
+        // expectedFileCount < availableBeCount, so use GATHER
+        Assertions.assertEquals(2, parallelism);
+        Assertions.assertTrue(useGather, "Should use GATHER when expected 
files < available BEs");
+    }
+
+    /**
+     * Test boundary case: expected files equal available BEs - should not use 
GATHER
+     * Data: 512MB, Target file size: 64MB
+     * Expected: expectedFileCount=8, useGather=false, parallelism=min(8, 
floor(8/8)=1)=1
+     */
+    @Test
+    public void 
testCalculateRewriteStrategy_BoundaryCase_EqualToBeCount_NoGather() throws 
Exception {
+        long totalSize = 512 * MB;
+        long targetFileSizeBytes = 64 * MB;
+        int availableBeCount = 8;
+
+        Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+        
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+        RewriteGroupTask task = new RewriteGroupTask(
+                mockGroup, 1L, mockTable, mockConnectContext,
+                targetFileSizeBytes, availableBeCount, null);
+
+        Object strategy = invokeCalculateRewriteStrategy(task);
+
+        int parallelism = (int) getFieldValue(strategy, "parallelism");
+        boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+        // expectedFileCount = ceil(512MB / 64MB) = 8
+        // expectedFileCount == availableBeCount, so do not use GATHER
+        Assertions.assertEquals(1, parallelism);
+        Assertions.assertFalse(useGather);
+    }
+
+    /**
+     * Test GATHER with high default parallelism - should cap at expected file 
count
+     * Data: 513MB, Target file size: 512MB, Default parallelism: 100
+     * Expected: expectedFileCount=2, useGather=true, parallelism=min(100, 2)=2
+     */
+    @Test
+    public void testCalculateRewriteStrategy_GatherCapsAtExpectedFileCount() 
throws Exception {
+        long totalSize = 513 * MB;
+        long targetFileSizeBytes = 512 * MB;
+        int availableBeCount = 100;
+
+        Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+        
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+        sessionVariable.parallelPipelineTaskNum = 100;
+
+        RewriteGroupTask task = new RewriteGroupTask(
+                mockGroup, 1L, mockTable, mockConnectContext,
+                targetFileSizeBytes, availableBeCount, null);
+
+        Object strategy = invokeCalculateRewriteStrategy(task);
+
+        int parallelism = (int) getFieldValue(strategy, "parallelism");
+        boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+        // expectedFileCount = ceil(513MB / 512MB) = 2
+        // expectedFileCount < availableBeCount, so use GATHER
+        Assertions.assertEquals(2, parallelism);
+        Assertions.assertTrue(useGather);
+    }
+
+    /**
+     * Test with limited BE count - parallelism limited by available BEs
+     * Data: 10GB, Target file size: 512MB, Available BEs: 5
+     * Expected: ~20 files, useGather=false, parallelism=min(8, 
floor(21/5)=4)=4
+     */
+    @Test
+    public void testCalculateRewriteStrategy_LimitedByBeCount() throws 
Exception {
+        long totalSize = 10 * GB;
+        long targetFileSizeBytes = 512 * MB;
+        int availableBeCount = 5; // Limited BE count
+
+        Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+        
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+        RewriteGroupTask task = new RewriteGroupTask(
+                mockGroup, 1L, mockTable, mockConnectContext,
+                targetFileSizeBytes, availableBeCount, null);
+
+        Object strategy = invokeCalculateRewriteStrategy(task);
+
+        int parallelism = (int) getFieldValue(strategy, "parallelism");
+        boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+        // expectedFileCount = ceil(10GB / 512MB) = ceil(20.48) = 21
+        // maxParallelismByFileCount = floor(21 / 5) = 4
+        // optimalParallelism = min(8, 4) = 4
+        Assertions.assertEquals(4, parallelism, "Parallelism should be limited 
by expected files / BE count");
+        Assertions.assertFalse(useGather);
+    }
+
+    /**
+     * Test with high default parallelism - still use GATHER if expected files 
< available BEs
+     * Data: 2GB, Target file size: 512MB, Default parallelism: 100
+     * Expected: expectedFileCount=4, useGather=true, parallelism=min(100, 4)=4
+     */
+    @Test
+    public void 
testCalculateRewriteStrategy_UseGatherEvenWithHighDefaultParallelism() throws 
Exception {
+        long totalSize = 2 * GB;
+        long targetFileSizeBytes = 512 * MB;
+        int availableBeCount = 100;
+
+        Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+        
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+        sessionVariable.parallelPipelineTaskNum = 100;
+
+        RewriteGroupTask task = new RewriteGroupTask(
+                mockGroup, 1L, mockTable, mockConnectContext,
+                targetFileSizeBytes, availableBeCount, null);
+
+        Object strategy = invokeCalculateRewriteStrategy(task);
+
+        int parallelism = (int) getFieldValue(strategy, "parallelism");
+        boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+        // expectedFileCount = ceil(2GB / 512MB) = ceil(4.0) = 4
+        // expectedFileCount < availableBeCount, so use GATHER
+        Assertions.assertEquals(4, parallelism, "Parallelism should be limited 
by expected file count");
+        Assertions.assertTrue(useGather);
+    }
+
+    /**
+     * Test with very small target file size
+     * Data: 1GB, Target file size: 100MB
+     * Expected: 11 files, useGather=true, parallelism=min(11, 100, 8)=8
+     */
+    @Test
+    public void testCalculateRewriteStrategy_SmallTargetFileSize() throws 
Exception {
+        long totalSize = 1 * GB;
+        long targetFileSizeBytes = 100 * MB; // Small target file size
+        int availableBeCount = 100;
+
+        Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+        
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+        RewriteGroupTask task = new RewriteGroupTask(
+                mockGroup, 1L, mockTable, mockConnectContext,
+                targetFileSizeBytes, availableBeCount, null);
+
+        Object strategy = invokeCalculateRewriteStrategy(task);
+
+        int parallelism = (int) getFieldValue(strategy, "parallelism");
+        boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+        // expectedFileCount = ceil(1GB / 100MB) = ceil(10.24) = 11
+        // expectedFileCount < availableBeCount, so use GATHER
+        Assertions.assertEquals(8, parallelism);
+        Assertions.assertTrue(useGather);
+    }
+
+    /**
+     * Test minimum parallelism guarantee
+     * Data: 0 bytes (edge case), Target file size: 512MB
+     * Expected: parallelism=1 (guaranteed minimum)
+     */
+    @Test
+    public void testCalculateRewriteStrategy_ZeroData_MinimumParallelism() 
throws Exception {
+        long totalSize = 0L;
+        long targetFileSizeBytes = 512 * MB;
+        int availableBeCount = 100;
+
+        Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+        Mockito.when(mockGroup.getTasks()).thenReturn(Collections.emptyList());
+
+        RewriteGroupTask task = new RewriteGroupTask(
+                mockGroup, 1L, mockTable, mockConnectContext,
+                targetFileSizeBytes, availableBeCount, null);
+
+        Object strategy = invokeCalculateRewriteStrategy(task);
+
+        int parallelism = (int) getFieldValue(strategy, "parallelism");
+
+        // expectedFileCount = ceil(0 / 512MB) = 0
+        // optimalParallelism = max(1, min(0, 100, 8)) = max(1, 0) = 1
+        Assertions.assertEquals(1, parallelism, "Parallelism should be at 
least 1");
+    }
+
+    /**
+     * Test invalid BE count - should throw when available BE count is zero
+     */
+    @Test
+    public void testCalculateRewriteStrategy_ZeroAvailableBe_Throws() throws 
Exception {
+        long totalSize = 1 * GB;
+        long targetFileSizeBytes = 512 * MB;
+        int availableBeCount = 0;
+
+        Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+        
Mockito.when(mockGroup.getTasks()).thenReturn(Collections.singletonList(mockFileScanTask));
+
+        RewriteGroupTask task = new RewriteGroupTask(
+                mockGroup, 1L, mockTable, mockConnectContext,
+                targetFileSizeBytes, availableBeCount, null);
+
+        java.lang.reflect.InvocationTargetException exception = 
Assertions.assertThrows(
+                java.lang.reflect.InvocationTargetException.class,
+                () -> invokeCalculateRewriteStrategy(task));
+
+        Assertions.assertTrue(exception.getCause() instanceof 
IllegalStateException);
+        
Assertions.assertTrue(exception.getCause().getMessage().contains("availableBeCount"),
+                "Exception message should mention availableBeCount");
+    }
+
+    /**
+     * Test realistic scenario with multiple partitions
+     * Data: 3GB across multiple partitions, Target file size: 512MB
+     * Expected: ~6 files, useGather=true, parallelism=min(6, 100, 8)=6
+     */
+    @Test
+    public void testCalculateRewriteStrategy_MultiplePartitions() throws 
Exception {
+        long totalSize = 3 * GB;
+        long targetFileSizeBytes = 512 * MB;
+        int availableBeCount = 100;
+
+        // Multiple tasks representing different partitions
+        Mockito.when(mockGroup.getTotalSize()).thenReturn(totalSize);
+        Mockito.when(mockGroup.getTasks()).thenReturn(Arrays.asList(
+                mockFileScanTask, mockFileScanTask, mockFileScanTask));
+
+        RewriteGroupTask task = new RewriteGroupTask(
+                mockGroup, 1L, mockTable, mockConnectContext,
+                targetFileSizeBytes, availableBeCount, null);
+
+        Object strategy = invokeCalculateRewriteStrategy(task);
+
+        int parallelism = (int) getFieldValue(strategy, "parallelism");
+        boolean useGather = (boolean) getFieldValue(strategy, "useGather");
+
+        // expectedFileCount = ceil(3GB / 512MB) = ceil(6.0) = 6
+        // expectedFileCount < availableBeCount, so use GATHER
+        Assertions.assertEquals(6, parallelism);
+        Assertions.assertTrue(useGather);
+    }
+
+    // ========== Helper Methods ==========
+
+    /**
+     * Invoke private method calculateRewriteStrategy using reflection
+     */
+    private Object invokeCalculateRewriteStrategy(RewriteGroupTask task) 
throws Exception {
+        Method method = 
RewriteGroupTask.class.getDeclaredMethod("calculateRewriteStrategy");
+        method.setAccessible(true);
+        return method.invoke(task);
+    }
+
+    /**
+     * Get field value from RewriteStrategy object using reflection
+     */
+    private Object getFieldValue(Object obj, String fieldName) throws 
Exception {
+        Class<?> clazz = obj.getClass();
+        java.lang.reflect.Field field = clazz.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return field.get(obj);
+    }
+}
diff --git 
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files.groovy
 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files.groovy
index bc808bfae21..3d43089bd2e 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files.groovy
@@ -492,4 +492,5 @@ suite("test_iceberg_rewrite_data_files", 
"p0,external,doris,external_docker,exte
     assertTrue(northRecords[0][0] == 3, "NORTH region should still have 3 
records")
     
     logger.info("Specific partition rewrite test completed successfully")
-}
\ No newline at end of file
+
+}
diff --git 
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files_parallelism.groovy
 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files_parallelism.groovy
new file mode 100644
index 00000000000..696fc89371d
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files_parallelism.groovy
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_rewrite_data_files_parallelism", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String catalog_name = "test_iceberg_rewrite_data_files_parallelism"
+    String db_name = "test_db_parallelism"
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    sql """switch ${catalog_name}"""
+    sql """CREATE DATABASE IF NOT EXISTS ${db_name} """
+    sql """use ${db_name}"""
+
+    def getAliveBeCount = {
+        def backends = sql_return_maparray "SHOW BACKENDS"
+        def alive = backends.findAll { be ->
+            def aliveVal = be.Alive
+            if (aliveVal == null) {
+                aliveVal = be.IsAlive
+            }
+            if (aliveVal == null) {
+                aliveVal = be.alive
+            }
+            return aliveVal != null && 
aliveVal.toString().equalsIgnoreCase("true")
+        }.size()
+        if (alive == 0 && backends.size() > 0) {
+            logger.warn("SHOW BACKENDS does not report Alive, fallback to 
total count")
+            alive = backends.size()
+        }
+        assertTrue(alive > 0, "No alive backend found for rewrite test")
+        return alive
+    }
+
+    def createAndInsertSmallBatches = { String tableName, int batches ->
+        sql """DROP TABLE IF EXISTS ${db_name}.${tableName}"""
+        sql """
+            CREATE TABLE ${db_name}.${tableName} (
+                id INT,
+                payload STRING
+            ) ENGINE=iceberg
+        """
+        (1..batches).each { batch ->
+            int base = (batch - 1) * 5
+            sql """
+                INSERT INTO ${db_name}.${tableName} VALUES
+                (${base + 1}, lpad('x', 1024, 'x')),
+                (${base + 2}, lpad('x', 1024, 'x')),
+                (${base + 3}, lpad('x', 1024, 'x')),
+                (${base + 4}, lpad('x', 1024, 'x')),
+                (${base + 5}, lpad('x', 1024, 'x'))
+            """
+        }
+    }
+
+    def getTotalSize = { String tableName ->
+        List<List<Object>> files = sql """SELECT file_size_in_bytes FROM 
${tableName}\$files"""
+        assertTrue(files.size() > 0, "Expected at least 1 file before rewrite")
+        return files.inject(0L) { acc, row -> acc + (row[0] as long) }
+    }
+
+    def expectedUpperBound = { long expectedFileCount, int aliveBeCount, int 
pipelineParallelism ->
+        boolean useGather = expectedFileCount <= aliveBeCount
+        if (useGather) {
+            return (int) Math.min(expectedFileCount, (long) 
pipelineParallelism)
+        }
+        long perBeParallelism = Math.max(1L, (long)(expectedFileCount / (long) 
aliveBeCount))
+        perBeParallelism = Math.min(perBeParallelism, (long) 
pipelineParallelism)
+        return (int) Math.min(expectedFileCount, perBeParallelism * 
aliveBeCount)
+    }
+
+    int aliveBeCount = getAliveBeCount()
+
+    // 
=====================================================================================
+    // Case 1: expectedFileCount <= aliveBeCount (GATHER), upper bound is 
expectedFileCount
+    // 
=====================================================================================
+    logger.info("Starting gather boundary case (expectedFileCount <= 
aliveBeCount)")
+    def table_name_gather = "test_rewrite_gather_boundary"
+    createAndInsertSmallBatches(table_name_gather, 10)
+    long totalSizeGather = getTotalSize(table_name_gather)
+    int pipelineParallelism = 16
+    sql """set parallel_pipeline_task_num=${pipelineParallelism}"""
+
+    // Calculate target file size to achieve gather case
+    long targetFileSizeGather = Math.max(1L, (long) Math.ceil(totalSizeGather 
* 1.0 / aliveBeCount))
+    long expectedFileCountGather = (long) Math.ceil(totalSizeGather * 1.0 / 
targetFileSizeGather)
+
+    // Due to file size granularity and metadata overhead, allow slight 
deviation from ideal
+    // The test validates expectedUpperBound function behavior near the gather 
threshold
+    assertTrue(expectedFileCountGather <= aliveBeCount + 1,
+        "Expected gather case (with tolerance): 
expectedFileCount=${expectedFileCountGather}, aliveBeCount=${aliveBeCount}")
+
+    def rewriteGatherResult = sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${table_name_gather}
+        EXECUTE rewrite_data_files(
+            "target-file-size-bytes" = "${targetFileSizeGather}",
+            "min-input-files" = "1",
+            "rewrite-all" = "true",
+            "max-file-group-size-bytes" = "1099511627776"
+        )
+    """
+    int addedFilesGather = rewriteGatherResult[0][1] as int
+    int expectedUpperGather = expectedUpperBound(expectedFileCountGather, 
aliveBeCount, pipelineParallelism)
+    assertTrue(addedFilesGather > 0, "Expected added files > 0 in gather case")
+    assertTrue(addedFilesGather <= expectedUpperGather,
+        "addedFiles=${addedFilesGather}, expectedUpper=${expectedUpperGather}, 
"
+        + "expectedFileCount=${expectedFileCountGather}")
+
+    // 
=====================================================================================
+    // Case 2: expectedFileCount > aliveBeCount (non-GATHER), limit by 
expected files per BE
+    // 
=====================================================================================
+    logger.info("Starting non-gather case (expectedFileCount > aliveBeCount)")
+    def table_name_nongather = "test_rewrite_non_gather"
+    createAndInsertSmallBatches(table_name_nongather, 10)
+    long totalSizeNonGather = getTotalSize(table_name_nongather)
+    pipelineParallelism = 64
+    sql """set parallel_pipeline_task_num=${pipelineParallelism}"""
+
+    long targetFileSizeNonGather = Math.max(1L, (long) 
Math.floor(totalSizeNonGather / (aliveBeCount + 1)))
+    long expectedFileCountNonGather = (long) Math.ceil(totalSizeNonGather * 
1.0 / targetFileSizeNonGather)
+    assertTrue(expectedFileCountNonGather > aliveBeCount,
+        "Expected non-gather case: 
expectedFileCount=${expectedFileCountNonGather}, aliveBeCount=${aliveBeCount}")
+
+    def rewriteNonGatherResult = sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${table_name_nongather}
+        EXECUTE rewrite_data_files(
+            "target-file-size-bytes" = "${targetFileSizeNonGather}",
+            "min-input-files" = "1",
+            "rewrite-all" = "true",
+            "max-file-group-size-bytes" = "1099511627776"
+        )
+    """
+    int addedFilesNonGather = rewriteNonGatherResult[0][1] as int
+    int expectedUpperNonGather = 
expectedUpperBound(expectedFileCountNonGather, aliveBeCount, 
pipelineParallelism)
+    assertTrue(addedFilesNonGather > 0, "Expected added files > 0 in 
non-gather case")
+    assertTrue(addedFilesNonGather <= expectedUpperNonGather,
+        "addedFiles=${addedFilesNonGather}, 
expectedUpper=${expectedUpperNonGather}, "
+        + "expectedFileCount=${expectedFileCountNonGather}")
+
+    // 
=====================================================================================
+    // Case 3: Moved from test_iceberg_rewrite_data_files, validates upper 
bound dynamically
+    // 
=====================================================================================
+    logger.info("Starting rewrite output file count cap test case (moved)")
+    def table_name_limit = "test_rewrite_file_count_cap"
+    createAndInsertSmallBatches(table_name_limit, 10)
+    long totalSizeLimit = getTotalSize(table_name_limit)
+    pipelineParallelism = 64
+    sql """set parallel_pipeline_task_num=${pipelineParallelism}"""
+
+    long targetFileSize = Math.max(1L, (long) Math.floor(totalSizeLimit / 
(aliveBeCount + 1)))
+    long expectedFileCount = (long) Math.ceil(totalSizeLimit * 1.0 / 
targetFileSize)
+    if (expectedFileCount <= aliveBeCount) {
+        targetFileSize = 1L
+        expectedFileCount = totalSizeLimit
+    }
+
+    def rewriteLimitResult = sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${table_name_limit}
+        EXECUTE rewrite_data_files(
+            "target-file-size-bytes" = "${targetFileSize}",
+            "min-input-files" = "1",
+            "rewrite-all" = "true",
+            "max-file-group-size-bytes" = "1099511627776"
+        )
+    """
+    int addedFilesCountLimit = rewriteLimitResult[0][1] as int
+    int expectedUpper = expectedUpperBound(expectedFileCount, aliveBeCount, 
pipelineParallelism)
+    assertTrue(addedFilesCountLimit > 0, "Expected added files count > 0 after 
rewrite")
+    assertTrue(addedFilesCountLimit <= expectedUpper,
+        "addedFilesCount=${addedFilesCountLimit}, 
expectedUpper=${expectedUpper}, "
+        + "aliveBeCount=${aliveBeCount}, 
expectedFileCount=${expectedFileCount}")
+
+    logger.info("Rewrite data files parallelism tests completed successfully")
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to