This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a78c1d98b90 [fix](mc) fix memory leak and optimize large data write 
for MaxCompute connector (#61245)
a78c1d98b90 is described below

commit a78c1d98b904706925ed8e3f8dcbd252e5c10431
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sun Mar 22 16:59:12 2026 -0700

    [fix](mc) fix memory leak and optimize large data write for MaxCompute 
connector (#61245)
    
    ### What problem does this PR solve?
    
    Fix:
    - Fix potential memory leak in MaxComputeJniScanner by closing
      currentSplitReader in close().
    - Fix potential memory leak in MaxComputeJniWriter by restructuring
      close() with try-finally to ensure allocator is always closed even
      when batchWriter.commit() throws. Also close VectorSchemaRoot after
      each batch write.
    - Fix maxWriteBatchRows parameter key mismatch between BE
      ("max_write_batch_rows") and JNI ("mc.max_write_batch_rows"),
      which caused user-customized values to be silently ignored.
    
    Optimization:
    - Split large Arrow batches into smaller chunks (controlled by
      mc.max_write_batch_rows, default 4096) to avoid HTTP 413 Request
      Entity Too Large errors from MaxCompute Storage API.
    - Skip unnecessary SORT node for static partition INSERT, since all
      data goes to a single known partition and no dynamic routing is
      needed.
    - Enable ZSTD compression for Arrow data transfer to reduce network
      bandwidth.
    New catalog properties:
    - mc.max_write_batch_rows: max rows per Arrow batch for write
      (default: 4096)
    - mc.max_field_size_bytes: max field size in bytes for write session
      (default: 8MB)
    
    Co-authored-by: daidai <[email protected]>
---
 .../sink/writer/maxcompute/vmc_table_writer.cpp    |  43 +-
 .../exec/sink/writer/maxcompute/vmc_table_writer.h |   6 +
 .../doris/maxcompute/MaxComputeJniScanner.java     | 162 ++++++-
 .../doris/maxcompute/MaxComputeJniWriter.java      | 521 ++++++++++++++++++---
 .../doris/common/maxcompute/MCProperties.java      |   6 +
 .../doris/datasource/maxcompute/MCTransaction.java |   9 +-
 .../maxcompute/MaxComputeExternalCatalog.java      |  16 +
 .../physical/PhysicalMaxComputeTableSink.java      |  13 +
 .../apache/doris/planner/MaxComputeTableSink.java  |   1 +
 gensrc/thrift/DataSinks.thrift                     |   1 +
 .../maxcompute/write/test_mc_write_large_data.out  |  10 +
 .../write/test_mc_write_large_data.groovy          |  49 ++
 .../write/test_mc_write_static_partitions.groovy   |  70 +++
 13 files changed, 810 insertions(+), 97 deletions(-)

diff --git a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp 
b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
index 323bbfbe231..3abb836d87d 100644
--- a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
+++ b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
@@ -117,6 +117,9 @@ std::map<std::string, std::string> 
VMCTableWriter::_build_base_writer_params() {
     if (_mc_sink.__isset.retry_count) {
         params["retry_count"] = std::to_string(_mc_sink.retry_count);
     }
+    if (_mc_sink.__isset.max_write_batch_rows) {
+        params["max_write_batch_rows"] = 
std::to_string(_mc_sink.max_write_batch_rows);
+    }
     return params;
 }
 
@@ -156,13 +159,10 @@ Status VMCTableWriter::write(RuntimeState* state, Block& 
block) {
             it = _partitions_to_writers.find(_static_partition_spec);
         }
         output_block.erase(_non_write_columns_indices);
-        return it->second->write(output_block);
+        return _write_block_in_chunks(it->second, output_block);
     }
 
     // Case 2: Dynamic partition or non-partitioned table
-    // For dynamic partitions, MaxCompute Storage API (with 
DynamicPartitionOptions) expects
-    // partition column values in the Arrow data and handles routing 
internally.
-    // So we send the full block including partition columns to a single 
writer.
     std::string partition_key = "";
     auto it = _partitions_to_writers.find(partition_key);
     if (it == _partitions_to_writers.end()) {
@@ -171,7 +171,40 @@ Status VMCTableWriter::write(RuntimeState* state, Block& 
block) {
         _partitions_to_writers.insert({partition_key, writer});
         it = _partitions_to_writers.find(partition_key);
     }
-    return it->second->write(output_block);
+    return _write_block_in_chunks(it->second, output_block);
+}
+
+Status VMCTableWriter::_write_block_in_chunks(const 
std::shared_ptr<VMCPartitionWriter>& writer,
+                                              Block& output_block) {
+    // Limit per-JNI data to MAX_WRITE_BLOCK_BYTES. When data source is not MC 
scanner
+    // (e.g. Doris internal table, Hive, JDBC), the upstream batch_size 
controls Block
+    // row count but not byte size. With large rows (585KB/row), a 4096-row 
Block is
+    // ~2.4GB. Splitting ensures each JNI call processes bounded data, 
limiting Arrow
+    // and SDK native memory per call.
+    static constexpr size_t MAX_WRITE_BLOCK_BYTES = 256 * 1024 * 1024; // 256MB
+
+    const size_t block_bytes = output_block.allocated_bytes();
+    const size_t rows = output_block.rows();
+
+    if (block_bytes <= MAX_WRITE_BLOCK_BYTES || rows <= 1) {
+        return writer->write(output_block);
+    }
+
+    const size_t bytes_per_row = block_bytes / rows;
+    const size_t max_rows = std::max(size_t(1), MAX_WRITE_BLOCK_BYTES / 
bytes_per_row);
+
+    for (size_t offset = 0; offset < rows; offset += max_rows) {
+        const size_t num_rows = std::min(max_rows, rows - offset);
+        Block sub_block = output_block.clone_empty();
+        auto columns = sub_block.mutate_columns();
+        for (size_t i = 0; i < columns.size(); i++) {
+            
columns[i]->insert_range_from(*output_block.get_by_position(i).column, offset,
+                                          num_rows);
+        }
+        sub_block.set_columns(std::move(columns));
+        RETURN_IF_ERROR(writer->write(sub_block));
+    }
+    return Status::OK();
 }
 
 Status VMCTableWriter::close(Status status) {
diff --git a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.h 
b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.h
index 24075e47a06..643b28f1cc0 100644
--- a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.h
+++ b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.h
@@ -54,6 +54,12 @@ public:
 private:
     std::shared_ptr<VMCPartitionWriter> _create_partition_writer(const 
std::string& partition_spec);
 
+    // Split large blocks into sub-blocks before JNI to limit Arrow and SDK
+    // native memory. Needed when data source is not MC scanner and blocks
+    // may exceed 256MB (e.g. batch_size=4096 with 585KB/row = 2.4GB).
+    Status _write_block_in_chunks(const std::shared_ptr<VMCPartitionWriter>& 
writer,
+                                  Block& output_block);
+
     std::map<std::string, std::string> _build_base_writer_params();
 
     TDataSink _t_sink;
diff --git 
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
 
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index ce1b3941dca..336991f3802 100644
--- 
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++ 
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -33,6 +33,7 @@ import com.aliyun.odps.table.read.split.InputSplit;
 import com.aliyun.odps.table.read.split.impl.IndexedInputSplit;
 import com.aliyun.odps.table.read.split.impl.RowRangeInputSplit;
 import com.google.common.base.Strings;
+import org.apache.arrow.vector.BaseVariableWidthVector;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.log4j.Logger;
@@ -60,6 +61,13 @@ public class MaxComputeJniScanner extends JniScanner {
 
     private static final Logger LOG = 
Logger.getLogger(MaxComputeJniScanner.class);
 
+    // 256MB byte budget per scanner batch — limits the C++ Block size at the 
source.
+    // With large rows (e.g. 585KB/row STRING), batch_size=4096 would create 
~2.4GB Blocks.
+    // The pipeline's AsyncResultWriter queues up to 3 Blocks per instance, 
and with
+    // parallel_pipeline_task_num instances, total queue memory = instances * 
3 * block_size.
+    // 256MB keeps queue memory manageable: 5 instances * 3 * 256MB = 3.8GB.
+    private static final long MAX_BATCH_BYTES = 256 * 1024 * 1024L;
+
     private static final String ACCESS_KEY = "access_key";
     private static final String SECRET_KEY = "secret_key";
     private static final String ENDPOINT = "endpoint";
@@ -90,6 +98,8 @@ public class MaxComputeJniScanner extends JniScanner {
     private String table;
 
     private SplitReader<VectorSchemaRoot> currentSplitReader;
+    private VectorSchemaRoot currentBatch = null;
+    private int currentBatchRowOffset = 0;
     private MaxComputeColumnValue columnValue;
 
     private Map<String, Integer> readColumnsToId;
@@ -215,8 +225,17 @@ public class MaxComputeJniScanner extends JniScanner {
 
     @Override
     public void close() throws IOException {
+        if (currentSplitReader != null) {
+            try {
+                currentSplitReader.close();
+            } catch (Exception e) {
+                LOG.warn("Failed to close MaxCompute split reader for table " 
+ project + "." + table, e);
+            }
+        }
         startOffset = -1;
         splitSize = -1;
+        currentBatch = null;
+        currentBatchRowOffset = 0;
         currentSplitReader = null;
         settings = null;
         scan = null;
@@ -234,45 +253,74 @@ public class MaxComputeJniScanner extends JniScanner {
         return readVectors(expectedRows);
     }
 
+    private VectorSchemaRoot getNextBatch() throws IOException {
+        try {
+            if (!currentSplitReader.hasNext()) {
+                currentSplitReader.close();
+                currentSplitReader = null;
+                return null;
+            }
+            return currentSplitReader.get();
+        } catch (Exception e) {
+            String errorMsg = "MaxComputeJniScanner readVectors get batch 
fail";
+            LOG.warn(errorMsg, e);
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+
     private int readVectors(int expectedRows) throws IOException {
         int curReadRows = 0;
+        long accumulatedBytes = 0;
         while (curReadRows < expectedRows) {
-            try {
-                if (!currentSplitReader.hasNext()) {
-                    currentSplitReader.close();
-                    currentSplitReader = null;
+            // Stop early if accumulated variable-width bytes approach int32 
limit
+            if (accumulatedBytes >= MAX_BATCH_BYTES) {
+                break;
+            }
+            if (currentBatch == null) {
+                currentBatch = getNextBatch();
+                if (currentBatch == null || currentBatch.getRowCount() == 0) {
+                    currentBatch = null;
                     break;
                 }
-            } catch (Exception e) {
-                String errorMsg = "MaxComputeJniScanner readVectors hasNext 
fail";
-                LOG.warn(errorMsg, e);
-                throw new IOException(e.getMessage(), e);
+                currentBatchRowOffset = 0;
             }
-
             try {
-                VectorSchemaRoot data = currentSplitReader.get();
-                if (data.getRowCount() == 0) {
+                int rowsToAppend = Math.min(expectedRows - curReadRows,
+                        currentBatch.getRowCount() - currentBatchRowOffset);
+                List<FieldVector> fieldVectors = 
currentBatch.getFieldVectors();
+
+                // Limit rows to avoid int32 overflow in VectorColumn's String 
byte buffer
+                rowsToAppend = limitRowsByVarWidthBytes(
+                        fieldVectors, currentBatchRowOffset, rowsToAppend,
+                        MAX_BATCH_BYTES - accumulatedBytes);
+                if (rowsToAppend <= 0) {
                     break;
                 }
 
-                List<FieldVector> fieldVectors = data.getFieldVectors();
-                int batchRows = 0;
                 long startTime = System.nanoTime();
                 for (FieldVector column : fieldVectors) {
                     Integer readColumnId = 
readColumnsToId.get(column.getName());
-                    batchRows = column.getValueCount();
                     if (readColumnId == null) {
                         continue;
                     }
                     columnValue.reset(column);
-                    for (int j = 0; j < batchRows; j++) {
+                    for (int j = currentBatchRowOffset; j < 
currentBatchRowOffset + rowsToAppend; j++) {
                         columnValue.setColumnIdx(j);
                         appendData(readColumnId, columnValue);
                     }
                 }
                 appendDataTime += System.nanoTime() - startTime;
 
-                curReadRows += batchRows;
+                // Track bytes for the rows just appended
+                accumulatedBytes += estimateVarWidthBytes(
+                        fieldVectors, currentBatchRowOffset, rowsToAppend);
+
+                currentBatchRowOffset += rowsToAppend;
+                curReadRows += rowsToAppend;
+                if (currentBatchRowOffset >= currentBatch.getRowCount()) {
+                    currentBatch = null;
+                    currentBatchRowOffset = 0;
+                }
             } catch (Exception e) {
                 String errorMsg = String.format("MaxComputeJniScanner Fail to 
read arrow data. "
                         + "curReadRows = {}, expectedRows = {}", curReadRows, 
expectedRows);
@@ -280,9 +328,91 @@ public class MaxComputeJniScanner extends JniScanner {
                 throw new RuntimeException(errorMsg, e);
             }
         }
+        if (LOG.isDebugEnabled() && curReadRows > 0 && curReadRows < 
expectedRows) {
+            LOG.debug("readVectors: returning " + curReadRows + " rows 
(limited by byte budget)"
+                    + ", totalVarWidthBytes=" + accumulatedBytes
+                    + ", expectedRows=" + expectedRows);
+        }
         return curReadRows;
     }
 
+    /**
+     * Limit the number of rows to append so that no single variable-width 
column
+     * exceeds the remaining byte budget. This prevents int32 overflow in
+     * VectorColumn's appendIndex for String/Binary child byte arrays.
+     *
+     * Uses Arrow's offset buffer for O(1)-per-row byte size calculation —
+     * no data copying involved.
+     */
+    private int limitRowsByVarWidthBytes(List<FieldVector> fieldVectors,
+            int offset, int maxRows, long remainingBudget) {
+        if (remainingBudget <= 0) {
+            return 0;
+        }
+        int safeRows = maxRows;
+        for (FieldVector fv : fieldVectors) {
+            if (fv instanceof BaseVariableWidthVector) {
+                BaseVariableWidthVector vec = (BaseVariableWidthVector) fv;
+                // Find how many rows fit within the budget for THIS column
+                int rows = findMaxRowsWithinBudget(vec, offset, maxRows, 
remainingBudget);
+                safeRows = Math.min(safeRows, rows);
+            }
+        }
+        // Always allow at least 1 row to make progress, even if it exceeds 
budget
+        return Math.max(1, safeRows);
+    }
+
+    /**
+     * Binary search for the maximum number of rows starting at 'offset'
+     * whose total bytes in the variable-width vector fit within 'budget'.
+     */
+    private int findMaxRowsWithinBudget(BaseVariableWidthVector vec,
+            int offset, int maxRows, long budget) {
+        if (maxRows <= 0) {
+            return 0;
+        }
+        // Total bytes for all maxRows
+        long totalBytes = (long) vec.getOffsetBuffer().getInt((long) (offset + 
maxRows) * 4)
+                - (long) vec.getOffsetBuffer().getInt((long) offset * 4);
+        if (totalBytes <= budget) {
+            return maxRows;
+        }
+        // Binary search for the cutoff point
+        int lo = 1;
+        int hi = maxRows - 1;
+        int startOff = vec.getOffsetBuffer().getInt((long) offset * 4);
+        while (lo <= hi) {
+            int mid = lo + (hi - lo) / 2;
+            long bytes = (long) vec.getOffsetBuffer().getInt((long) (offset + 
mid) * 4) - startOff;
+            if (bytes <= budget) {
+                lo = mid + 1;
+            } else {
+                hi = mid - 1;
+            }
+        }
+        // 'hi' is the largest count whose bytes <= budget (could be 0)
+        return hi;
+    }
+
+    /**
+     * Estimate total variable-width bytes for the given row range across all 
columns.
+     * Returns the max bytes of any single column (since each column has its 
own
+     * VectorColumn child buffer and the overflow is per-column).
+     */
+    private long estimateVarWidthBytes(List<FieldVector> fieldVectors,
+            int offset, int rows) {
+        long maxColumnBytes = 0;
+        for (FieldVector fv : fieldVectors) {
+            if (fv instanceof BaseVariableWidthVector) {
+                BaseVariableWidthVector vec = (BaseVariableWidthVector) fv;
+                long bytes = (long) vec.getOffsetBuffer().getInt((long) 
(offset + rows) * 4)
+                        - (long) vec.getOffsetBuffer().getInt((long) offset * 
4);
+                maxColumnBytes = Math.max(maxColumnBytes, bytes);
+            }
+        }
+        return maxColumnBytes;
+    }
+
     private static Object deserialize(String serializedString) throws 
IOException, ClassNotFoundException {
         byte[] serializedBytes = Base64.getDecoder().decode(serializedString);
         ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(serializedBytes);
diff --git 
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
 
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
index 86b849cd627..4e0f2570a82 100644
--- 
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
+++ 
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
@@ -18,6 +18,7 @@
 package org.apache.doris.maxcompute;
 
 import org.apache.doris.common.jni.JniWriter;
+import org.apache.doris.common.jni.vec.VectorColumn;
 import org.apache.doris.common.jni.vec.VectorTable;
 import org.apache.doris.common.maxcompute.MCUtils;
 
@@ -25,6 +26,7 @@ import com.aliyun.odps.Odps;
 import com.aliyun.odps.OdpsType;
 import com.aliyun.odps.table.configuration.ArrowOptions;
 import com.aliyun.odps.table.configuration.ArrowOptions.TimestampUnit;
+import com.aliyun.odps.table.configuration.CompressionCodec;
 import com.aliyun.odps.table.configuration.RestOptions;
 import com.aliyun.odps.table.configuration.WriterOptions;
 import com.aliyun.odps.table.enviroment.Credentials;
@@ -92,18 +94,35 @@ public class MaxComputeJniWriter extends JniWriter {
     private static final String CONNECT_TIMEOUT = "connect_timeout";
     private static final String READ_TIMEOUT = "read_timeout";
     private static final String RETRY_COUNT = "retry_count";
+    private static final String MAX_WRITE_BATCH_ROWS = "max_write_batch_rows";
 
     private final Map<String, String> params;
+
+    // 128MB batch threshold — controls peak Arrow native memory per batch.
+    // Arrow uses sun.misc.Unsafe.allocateMemory() which is invisible to JVM 
metrics.
+    // Each batch temporarily holds ~batchDataBytes of native memory.
+    // With 3 concurrent writers, total Arrow native = 3 * 128MB = ~384MB.
+    // Using 1GB was too large: 3 writers * 1GB = 3GB invisible native memory.
+    private static final long MAX_ARROW_BATCH_BYTES = 128 * 1024 * 1024L;
+
+    // Segmented commit: commit and recreate batchWriter every N rows to 
prevent
+    // MaxCompute SDK native memory accumulation. Without this, the SDK buffers
+    // all written data internally, causing process RSS to grow linearly with
+    // total data volume until SIGSEGV.
+    private static final long ROWS_PER_SEGMENT = 5000;
+
     private final String endpoint;
     private final String project;
     private final String tableName;
     private final String quota;
     private String writeSessionId;
     private long blockId;
+    private long nextBlockId; // For creating new segments with unique blockIds
     private String partitionSpec;
     private int connectTimeout;
     private int readTimeout;
     private int retryCount;
+    private int maxWriteBatchRows;
 
     // Storage API objects
     private TableBatchWriteSession writeSession;
@@ -111,7 +130,14 @@ public class MaxComputeJniWriter extends JniWriter {
     private BufferAllocator allocator;
     private List<TypeInfo> columnTypeInfos;
     private List<String> columnNames;
-    private WriterCommitMessage commitMessage;
+    // Collect commit messages from all segments (each batchWriter commit 
produces one)
+    private final List<WriterCommitMessage> commitMessages = new 
java.util.ArrayList<>();
+
+    // Per-segment row counter (resets after each segment commit)
+    private long segmentRows = 0;
+
+    // Writer options cached for creating new batchWriters
+    private WriterOptions writerOptions;
 
     // Statistics
     private long writtenRows = 0;
@@ -127,10 +153,12 @@ public class MaxComputeJniWriter extends JniWriter {
         this.writeSessionId = 
Objects.requireNonNull(params.get(WRITE_SESSION_ID),
                 "required property '" + WRITE_SESSION_ID + "'.");
         this.blockId = Long.parseLong(params.getOrDefault(BLOCK_ID, "0"));
+        this.nextBlockId = this.blockId + 1; // Reserve blockId for first 
writer, increment for segments
         this.partitionSpec = params.getOrDefault(PARTITION_SPEC, "");
         this.connectTimeout = 
Integer.parseInt(params.getOrDefault(CONNECT_TIMEOUT, "10"));
         this.readTimeout = Integer.parseInt(params.getOrDefault(READ_TIMEOUT, 
"120"));
         this.retryCount = Integer.parseInt(params.getOrDefault(RETRY_COUNT, 
"4"));
+        this.maxWriteBatchRows = 
Integer.parseInt(params.getOrDefault(MAX_WRITE_BATCH_ROWS, "4096"));
     }
 
     @Override
@@ -184,9 +212,10 @@ public class MaxComputeJniWriter extends JniWriter {
 
             allocator = new RootAllocator(Long.MAX_VALUE);
 
-            // Create Arrow writer for this block
-            WriterOptions writerOptions = WriterOptions.newBuilder()
+            // Cache writer options for creating new batchWriters in segments
+            writerOptions = WriterOptions.newBuilder()
                     .withSettings(settings)
+                    .withCompressionCodec(CompressionCodec.ZSTD)
                     .build();
             batchWriter = writeSession.createArrowWriter(blockId,
                     WriterAttemptId.of(0), writerOptions);
@@ -210,19 +239,44 @@ public class MaxComputeJniWriter extends JniWriter {
         }
 
         try {
-            Object[][] data = inputTable.getMaterializedData();
+            // Stream data directly from off-heap VectorColumn to Arrow 
vectors.
+            // Unlike the previous getMaterializedData() approach that created
+            // Object[][] (with String objects for STRING columns causing 3x 
memory
+            // amplification), this reads bytes directly from VectorColumn and 
writes
+            // to Arrow, keeping peak heap usage per batch to O(batch_rows * 
row_size)
+            // instead of O(2 * batch_rows * row_size).
+            int rowOffset = 0;
+            while (rowOffset < numRows) {
+                int batchRows = Math.min(maxWriteBatchRows, numRows - 
rowOffset);
 
-            // Get a pre-allocated VectorSchemaRoot from the batch writer
-            VectorSchemaRoot root = batchWriter.newElement();
-            root.setRowCount(numRows);
+                // For variable-width columns, check byte budget to avoid 
Arrow int32 overflow
+                batchRows = limitWriteBatchByBytesStreaming(inputTable, 
numCols,
+                        rowOffset, batchRows);
 
-            for (int col = 0; col < numCols && col < columnTypeInfos.size(); 
col++) {
-                OdpsType odpsType = columnTypeInfos.get(col).getOdpsType();
-                fillArrowVector(root, col, odpsType, data[col], numRows);
-            }
+                VectorSchemaRoot root = batchWriter.newElement();
+                try {
+                    root.setRowCount(batchRows);
 
-            batchWriter.write(root);
-            writtenRows += numRows;
+                    for (int col = 0; col < numCols && col < 
columnTypeInfos.size(); col++) {
+                        OdpsType odpsType = 
columnTypeInfos.get(col).getOdpsType();
+                        fillArrowVectorStreaming(root, col, odpsType,
+                                inputTable.getColumn(col), rowOffset, 
batchRows);
+                    }
+
+                    batchWriter.write(root);
+                } finally {
+                    root.close();
+                }
+
+                writtenRows += batchRows;
+                segmentRows += batchRows;
+                rowOffset += batchRows;
+
+                // Segmented commit: rotate batchWriter to release SDK native 
memory
+                if (segmentRows >= ROWS_PER_SEGMENT) {
+                    rotateBatchWriter();
+                }
+            }
         } catch (Exception e) {
             String errorMsg = "Failed to write data to MaxCompute table " + 
project + "." + tableName;
             LOG.error(errorMsg, e);
@@ -230,17 +284,315 @@ public class MaxComputeJniWriter extends JniWriter {
         }
     }
 
+    /**
+     * Commit current batchWriter and create a new one with a fresh blockId.
+     * This forces the MaxCompute SDK to flush and release internal native 
memory
+     * buffers that accumulate during writes. Without rotation, the SDK holds 
all
+     * serialized Arrow data in native memory until close(), causing process 
RSS
+     * to grow linearly with total data volume.
+     */
+    private void rotateBatchWriter() throws IOException {
+        try {
+            // 1. Commit current batchWriter and save its commit message
+            WriterCommitMessage msg = batchWriter.commit();
+            commitMessages.add(msg);
+            batchWriter = null;
+
+            // 2. Close current allocator to release Arrow native memory
+            allocator.close();
+            allocator = null;
+
+            // 3. Create new allocator and batchWriter with a new blockId
+            long newBlockId = nextBlockId++;
+            allocator = new RootAllocator(Long.MAX_VALUE);
+            batchWriter = writeSession.createArrowWriter(newBlockId,
+                    WriterAttemptId.of(0), writerOptions);
+
+            LOG.info("Rotated batchWriter: oldBlockId=" + blockId + ", 
newBlockId=" + newBlockId
+                    + ", totalCommitMessages=" + commitMessages.size()
+                    + ", totalWrittenRows=" + writtenRows);
+
+            blockId = newBlockId;
+            segmentRows = 0;
+        } catch (Exception e) {
+            throw new IOException("Failed to rotate batchWriter for table "
+                    + project + "." + tableName, e);
+        }
+    }
+
+
+    private boolean isVariableWidthType(OdpsType type) {
+        return type == OdpsType.STRING || type == OdpsType.VARCHAR
+                || type == OdpsType.CHAR || type == OdpsType.BINARY;
+    }
+
+    /**
+     * Limit write batch size by estimating variable-width column bytes 
directly
+     * from the off-heap VectorColumn, without materializing data to Java heap.
+     */
+    private int limitWriteBatchByBytesStreaming(VectorTable inputTable, int 
numCols,
+                                               int rowOffset, int batchRows) {
+        for (int col = 0; col < numCols && col < columnTypeInfos.size(); 
col++) {
+            OdpsType odpsType = columnTypeInfos.get(col).getOdpsType();
+            if (!isVariableWidthType(odpsType)) {
+                continue;
+            }
+            VectorColumn vc = inputTable.getColumn(col);
+            batchRows = findMaxRowsForColumnStreaming(vc, rowOffset, 
batchRows, MAX_ARROW_BATCH_BYTES);
+            if (batchRows <= 1) {
+                return Math.max(1, batchRows);
+            }
+        }
+        return batchRows;
+    }
+
+    /**
+     * Find the maximum number of rows (from rowOffset) whose total byte size
+     * fits within budget, by reading offset metadata directly from 
VectorColumn.
+     */
+    private int findMaxRowsForColumnStreaming(VectorColumn vc, int rowOffset, 
int maxRows, long budget) {
+        long totalBytes = estimateColumnBytesStreaming(vc, rowOffset, maxRows);
+        if (totalBytes <= budget) {
+            return maxRows;
+        }
+        int rows = maxRows;
+        while (rows > 1) {
+            rows = rows / 2;
+            totalBytes = estimateColumnBytesStreaming(vc, rowOffset, rows);
+            if (totalBytes <= budget) {
+                int lo = rows;
+                int hi = Math.min(rows * 2, maxRows);
+                while (lo < hi) {
+                    int mid = lo + (hi - lo + 1) / 2;
+                    if (estimateColumnBytesStreaming(vc, rowOffset, mid) <= 
budget) {
+                        lo = mid;
+                    } else {
+                        hi = mid - 1;
+                    }
+                }
+                return lo;
+            }
+        }
+        return 1;
+    }
+
+    /**
+     * Estimate total bytes for a range of rows in a VectorColumn by reading
+     * the offset array directly from off-heap memory, without creating any
+     * byte[] objects. This is O(1) per row (just offset subtraction).
+     */
+    private long estimateColumnBytesStreaming(VectorColumn vc, int rowOffset, 
int rows) {
+        long total = 0;
+        long offsetAddr = vc.offsetAddress();
+        for (int i = rowOffset; i < rowOffset + rows; i++) {
+            if (!vc.isNullAt(i)) {
+                // String offsets are stored as int32 in VectorColumn
+                int startOff = i == 0 ? 0
+                        : 
org.apache.doris.common.jni.utils.OffHeap.getInt(null, offsetAddr + 4L * (i - 
1));
+                int endOff = 
org.apache.doris.common.jni.utils.OffHeap.getInt(null, offsetAddr + 4L * i);
+                total += (endOff - startOff);
+            }
+        }
+        return total;
+    }
+
+    /**
+     * Fill an Arrow vector by reading data directly from a VectorColumn,
+     * one row at a time. For STRING columns, this reads bytes directly
+     * (getBytesWithOffset) instead of creating String objects, eliminating
+     * the String -> byte[] double-copy that caused heap exhaustion.
+     */
+    private void fillArrowVectorStreaming(VectorSchemaRoot root, int colIdx, 
OdpsType odpsType,
+                                          VectorColumn vc, int rowOffset, int 
numRows) {
+        switch (odpsType) {
+            case BOOLEAN: {
+                BitVector vec = (BitVector) root.getVector(colIdx);
+                vec.allocateNew(numRows);
+                for (int i = 0; i < numRows; i++) {
+                    if (vc.isNullAt(rowOffset + i)) {
+                        vec.setNull(i);
+                    } else {
+                        vec.set(i, vc.getBoolean(rowOffset + i) ? 1 : 0);
+                    }
+                }
+                vec.setValueCount(numRows);
+                break;
+            }
+            case TINYINT: {
+                TinyIntVector vec = (TinyIntVector) root.getVector(colIdx);
+                vec.allocateNew(numRows);
+                for (int i = 0; i < numRows; i++) {
+                    if (vc.isNullAt(rowOffset + i)) {
+                        vec.setNull(i);
+                    } else {
+                        vec.set(i, vc.getByte(rowOffset + i));
+                    }
+                }
+                vec.setValueCount(numRows);
+                break;
+            }
+            case SMALLINT: {
+                SmallIntVector vec = (SmallIntVector) root.getVector(colIdx);
+                vec.allocateNew(numRows);
+                for (int i = 0; i < numRows; i++) {
+                    if (vc.isNullAt(rowOffset + i)) {
+                        vec.setNull(i);
+                    } else {
+                        vec.set(i, vc.getShort(rowOffset + i));
+                    }
+                }
+                vec.setValueCount(numRows);
+                break;
+            }
+            case INT: {
+                IntVector vec = (IntVector) root.getVector(colIdx);
+                vec.allocateNew(numRows);
+                for (int i = 0; i < numRows; i++) {
+                    if (vc.isNullAt(rowOffset + i)) {
+                        vec.setNull(i);
+                    } else {
+                        vec.set(i, vc.getInt(rowOffset + i));
+                    }
+                }
+                vec.setValueCount(numRows);
+                break;
+            }
+            case BIGINT: {
+                BigIntVector vec = (BigIntVector) root.getVector(colIdx);
+                vec.allocateNew(numRows);
+                for (int i = 0; i < numRows; i++) {
+                    if (vc.isNullAt(rowOffset + i)) {
+                        vec.setNull(i);
+                    } else {
+                        vec.set(i, vc.getLong(rowOffset + i));
+                    }
+                }
+                vec.setValueCount(numRows);
+                break;
+            }
+            case FLOAT: {
+                Float4Vector vec = (Float4Vector) root.getVector(colIdx);
+                vec.allocateNew(numRows);
+                for (int i = 0; i < numRows; i++) {
+                    if (vc.isNullAt(rowOffset + i)) {
+                        vec.setNull(i);
+                    } else {
+                        vec.set(i, vc.getFloat(rowOffset + i));
+                    }
+                }
+                vec.setValueCount(numRows);
+                break;
+            }
+            case DOUBLE: {
+                Float8Vector vec = (Float8Vector) root.getVector(colIdx);
+                vec.allocateNew(numRows);
+                for (int i = 0; i < numRows; i++) {
+                    if (vc.isNullAt(rowOffset + i)) {
+                        vec.setNull(i);
+                    } else {
+                        vec.set(i, vc.getDouble(rowOffset + i));
+                    }
+                }
+                vec.setValueCount(numRows);
+                break;
+            }
+            case DECIMAL: {
+                DecimalVector vec = (DecimalVector) root.getVector(colIdx);
+                vec.allocateNew(numRows);
+                for (int i = 0; i < numRows; i++) {
+                    if (vc.isNullAt(rowOffset + i)) {
+                        vec.setNull(i);
+                    } else {
+                        vec.set(i, vc.getDecimal(rowOffset + i));
+                    }
+                }
+                vec.setValueCount(numRows);
+                break;
+            }
+            case STRING:
+            case VARCHAR:
+            case CHAR: {
+                // KEY FIX: Read bytes directly from off-heap, no String 
creation.
+                // Previously: getMaterializedData -> String[] -> 
toString().getBytes() -> Arrow
+                // Now: getBytesWithOffset() -> byte[] -> Arrow (1 copy 
instead of 3)
+                VarCharVector vec = (VarCharVector) root.getVector(colIdx);
+                vec.allocateNew(numRows);
+                for (int i = 0; i < numRows; i++) {
+                    if (vc.isNullAt(rowOffset + i)) {
+                        vec.setNull(i);
+                    } else {
+                        byte[] bytes = vc.getBytesWithOffset(rowOffset + i);
+                        vec.setSafe(i, bytes);
+                    }
+                }
+                vec.setValueCount(numRows);
+                break;
+            }
+            case DATE: {
+                DateDayVector vec = (DateDayVector) root.getVector(colIdx);
+                vec.allocateNew(numRows);
+                for (int i = 0; i < numRows; i++) {
+                    if (vc.isNullAt(rowOffset + i)) {
+                        vec.setNull(i);
+                    } else {
+                        LocalDate date = vc.getDate(rowOffset + i);
+                        vec.set(i, (int) date.toEpochDay());
+                    }
+                }
+                vec.setValueCount(numRows);
+                break;
+            }
+            case DATETIME:
+            case TIMESTAMP: {
+                TimeStampMilliVector vec = (TimeStampMilliVector) 
root.getVector(colIdx);
+                vec.allocateNew(numRows);
+                for (int i = 0; i < numRows; i++) {
+                    if (vc.isNullAt(rowOffset + i)) {
+                        vec.setNull(i);
+                    } else {
+                        LocalDateTime dt = vc.getDateTime(rowOffset + i);
+                        long millis = 
dt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+                        vec.set(i, millis);
+                    }
+                }
+                vec.setValueCount(numRows);
+                break;
+            }
+            case BINARY: {
+                VarBinaryVector vec = (VarBinaryVector) root.getVector(colIdx);
+                vec.allocateNew(numRows);
+                for (int i = 0; i < numRows; i++) {
+                    if (vc.isNullAt(rowOffset + i)) {
+                        vec.setNull(i);
+                    } else {
+                        byte[] bytes = vc.getBytesWithOffset(rowOffset + i);
+                        vec.setSafe(i, bytes);
+                    }
+                }
+                vec.setValueCount(numRows);
+                break;
+            }
+            default: {
+                // For complex types (ARRAY, MAP, STRUCT) and other types,
+                // fall back to object-based materialization for this column 
only.
+                Object[] colData = vc.getObjectColumn(rowOffset, rowOffset + 
numRows);
+                fillArrowVector(root, colIdx, odpsType, colData, 0, numRows);
+                break;
+            }
+        }
+    }
+
     private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType 
odpsType,
-                                  Object[] colData, int numRows) {
+                                  Object[] colData, int rowOffset, int 
numRows) {
         switch (odpsType) {
             case BOOLEAN: {
                 BitVector vec = (BitVector) root.getVector(colIdx);
                 vec.allocateNew(numRows);
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         vec.setNull(i);
                     } else {
-                        vec.set(i, (Boolean) colData[i] ? 1 : 0);
+                        vec.set(i, (Boolean) colData[rowOffset + i] ? 1 : 0);
                     }
                 }
                 vec.setValueCount(numRows);
@@ -250,10 +602,10 @@ public class MaxComputeJniWriter extends JniWriter {
                 TinyIntVector vec = (TinyIntVector) root.getVector(colIdx);
                 vec.allocateNew(numRows);
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         vec.setNull(i);
                     } else {
-                        vec.set(i, ((Number) colData[i]).byteValue());
+                        vec.set(i, ((Number) colData[rowOffset + 
i]).byteValue());
                     }
                 }
                 vec.setValueCount(numRows);
@@ -263,10 +615,10 @@ public class MaxComputeJniWriter extends JniWriter {
                 SmallIntVector vec = (SmallIntVector) root.getVector(colIdx);
                 vec.allocateNew(numRows);
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         vec.setNull(i);
                     } else {
-                        vec.set(i, ((Number) colData[i]).shortValue());
+                        vec.set(i, ((Number) colData[rowOffset + 
i]).shortValue());
                     }
                 }
                 vec.setValueCount(numRows);
@@ -276,10 +628,10 @@ public class MaxComputeJniWriter extends JniWriter {
                 IntVector vec = (IntVector) root.getVector(colIdx);
                 vec.allocateNew(numRows);
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         vec.setNull(i);
                     } else {
-                        vec.set(i, ((Number) colData[i]).intValue());
+                        vec.set(i, ((Number) colData[rowOffset + 
i]).intValue());
                     }
                 }
                 vec.setValueCount(numRows);
@@ -289,10 +641,10 @@ public class MaxComputeJniWriter extends JniWriter {
                 BigIntVector vec = (BigIntVector) root.getVector(colIdx);
                 vec.allocateNew(numRows);
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         vec.setNull(i);
                     } else {
-                        vec.set(i, ((Number) colData[i]).longValue());
+                        vec.set(i, ((Number) colData[rowOffset + 
i]).longValue());
                     }
                 }
                 vec.setValueCount(numRows);
@@ -302,10 +654,10 @@ public class MaxComputeJniWriter extends JniWriter {
                 Float4Vector vec = (Float4Vector) root.getVector(colIdx);
                 vec.allocateNew(numRows);
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         vec.setNull(i);
                     } else {
-                        vec.set(i, ((Number) colData[i]).floatValue());
+                        vec.set(i, ((Number) colData[rowOffset + 
i]).floatValue());
                     }
                 }
                 vec.setValueCount(numRows);
@@ -315,10 +667,10 @@ public class MaxComputeJniWriter extends JniWriter {
                 Float8Vector vec = (Float8Vector) root.getVector(colIdx);
                 vec.allocateNew(numRows);
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         vec.setNull(i);
                     } else {
-                        vec.set(i, ((Number) colData[i]).doubleValue());
+                        vec.set(i, ((Number) colData[rowOffset + 
i]).doubleValue());
                     }
                 }
                 vec.setValueCount(numRows);
@@ -328,12 +680,12 @@ public class MaxComputeJniWriter extends JniWriter {
                 DecimalVector vec = (DecimalVector) root.getVector(colIdx);
                 vec.allocateNew(numRows);
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         vec.setNull(i);
                     } else {
-                        BigDecimal bd = (colData[i] instanceof BigDecimal)
-                                ? (BigDecimal) colData[i]
-                                : new BigDecimal(colData[i].toString());
+                        BigDecimal bd = (colData[rowOffset + i] instanceof 
BigDecimal)
+                                ? (BigDecimal) colData[rowOffset + i]
+                                : new BigDecimal(colData[rowOffset + 
i].toString());
                         vec.set(i, bd);
                     }
                 }
@@ -346,14 +698,14 @@ public class MaxComputeJniWriter extends JniWriter {
                 VarCharVector vec = (VarCharVector) root.getVector(colIdx);
                 vec.allocateNew(numRows);
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         vec.setNull(i);
                     } else {
                         byte[] bytes;
-                        if (colData[i] instanceof byte[]) {
-                            bytes = (byte[]) colData[i];
+                        if (colData[rowOffset + i] instanceof byte[]) {
+                            bytes = (byte[]) colData[rowOffset + i];
                         } else {
-                            bytes = 
colData[i].toString().getBytes(StandardCharsets.UTF_8);
+                            bytes = colData[rowOffset + 
i].toString().getBytes(StandardCharsets.UTF_8);
                         }
                         vec.setSafe(i, bytes);
                     }
@@ -365,12 +717,12 @@ public class MaxComputeJniWriter extends JniWriter {
                 DateDayVector vec = (DateDayVector) root.getVector(colIdx);
                 vec.allocateNew(numRows);
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         vec.setNull(i);
-                    } else if (colData[i] instanceof LocalDate) {
-                        vec.set(i, (int) ((LocalDate) 
colData[i]).toEpochDay());
+                    } else if (colData[rowOffset + i] instanceof LocalDate) {
+                        vec.set(i, (int) ((LocalDate) colData[rowOffset + 
i]).toEpochDay());
                     } else {
-                        vec.set(i, (int) 
LocalDate.parse(colData[i].toString()).toEpochDay());
+                        vec.set(i, (int) LocalDate.parse(colData[rowOffset + 
i].toString()).toEpochDay());
                     }
                 }
                 vec.setValueCount(numRows);
@@ -381,16 +733,16 @@ public class MaxComputeJniWriter extends JniWriter {
                 TimeStampMilliVector vec = (TimeStampMilliVector) 
root.getVector(colIdx);
                 vec.allocateNew(numRows);
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         vec.setNull(i);
-                    } else if (colData[i] instanceof LocalDateTime) {
-                        long millis = ((LocalDateTime) colData[i])
+                    } else if (colData[rowOffset + i] instanceof 
LocalDateTime) {
+                        long millis = ((LocalDateTime) colData[rowOffset + i])
                                 
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
                         vec.set(i, millis);
-                    } else if (colData[i] instanceof java.sql.Timestamp) {
-                        vec.set(i, ((java.sql.Timestamp) 
colData[i]).getTime());
+                    } else if (colData[rowOffset + i] instanceof 
java.sql.Timestamp) {
+                        vec.set(i, ((java.sql.Timestamp) colData[rowOffset + 
i]).getTime());
                     } else {
-                        long millis = 
LocalDateTime.parse(colData[i].toString())
+                        long millis = LocalDateTime.parse(colData[rowOffset + 
i].toString())
                                 
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
                         vec.set(i, millis);
                     }
@@ -402,12 +754,12 @@ public class MaxComputeJniWriter extends JniWriter {
                 VarBinaryVector vec = (VarBinaryVector) root.getVector(colIdx);
                 vec.allocateNew(numRows);
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         vec.setNull(i);
-                    } else if (colData[i] instanceof byte[]) {
-                        vec.setSafe(i, (byte[]) colData[i]);
+                    } else if (colData[rowOffset + i] instanceof byte[]) {
+                        vec.setSafe(i, (byte[]) colData[rowOffset + i]);
                     } else {
-                        vec.setSafe(i, 
colData[i].toString().getBytes(StandardCharsets.UTF_8));
+                        vec.setSafe(i, colData[rowOffset + 
i].toString().getBytes(StandardCharsets.UTF_8));
                     }
                 }
                 vec.setValueCount(numRows);
@@ -419,10 +771,10 @@ public class MaxComputeJniWriter extends JniWriter {
                 FieldVector dataVec = listVec.getDataVector();
                 int elemIdx = 0;
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         listVec.setNull(i);
                     } else {
-                        List<?> list = (List<?>) colData[i];
+                        List<?> list = (List<?>) colData[rowOffset + i];
                         listVec.startNewValue(i);
                         for (Object elem : list) {
                             writeListElement(dataVec, elemIdx++, elem);
@@ -442,10 +794,10 @@ public class MaxComputeJniWriter extends JniWriter {
                 FieldVector valVec = structVec.getChildrenFromFields().get(1);
                 int elemIdx = 0;
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         mapVec.setNull(i);
                     } else {
-                        Map<?, ?> map = (Map<?, ?>) colData[i];
+                        Map<?, ?> map = (Map<?, ?>) colData[rowOffset + i];
                         mapVec.startNewValue(i);
                         for (Map.Entry<?, ?> entry : map.entrySet()) {
                             structVec.setIndexDefined(elemIdx);
@@ -466,11 +818,11 @@ public class MaxComputeJniWriter extends JniWriter {
                 StructVector structVec = (StructVector) root.getVector(colIdx);
                 structVec.allocateNew();
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         structVec.setNull(i);
                     } else {
                         structVec.setIndexDefined(i);
-                        Map<?, ?> struct = (Map<?, ?>) colData[i];
+                        Map<?, ?> struct = (Map<?, ?>) colData[rowOffset + i];
                         for (FieldVector childVec : 
structVec.getChildrenFromFields()) {
                             Object val = struct.get(childVec.getName());
                             writeListElement(childVec, i, val);
@@ -488,10 +840,10 @@ public class MaxComputeJniWriter extends JniWriter {
                 VarCharVector vec = (VarCharVector) root.getVector(colIdx);
                 vec.allocateNew(numRows);
                 for (int i = 0; i < numRows; i++) {
-                    if (colData[i] == null) {
+                    if (colData[rowOffset + i] == null) {
                         vec.setNull(i);
                     } else {
-                        vec.setSafe(i, 
colData[i].toString().getBytes(StandardCharsets.UTF_8));
+                        vec.setSafe(i, colData[rowOffset + 
i].toString().getBytes(StandardCharsets.UTF_8));
                     }
                 }
                 vec.setValueCount(numRows);
@@ -580,23 +932,41 @@ public class MaxComputeJniWriter extends JniWriter {
 
     @Override
     public void close() throws IOException {
+        Exception firstException = null;
         try {
+            // Commit the final segment's batchWriter
             if (batchWriter != null) {
-                commitMessage = batchWriter.commit();
-                batchWriter = null;
+                try {
+                    WriterCommitMessage msg = batchWriter.commit();
+                    commitMessages.add(msg);
+                } catch (Exception e) {
+                    firstException = e;
+                    LOG.warn("Failed to commit batch writer for table " + 
project + "." + tableName, e);
+                } finally {
+                    batchWriter = null;
+                }
             }
+        } finally {
             if (allocator != null) {
-                allocator.close();
-                allocator = null;
+                try {
+                    allocator.close();
+                } catch (Exception e) {
+                    LOG.warn("Failed to close Arrow allocator (possible memory 
leak)", e);
+                    if (firstException == null) {
+                        firstException = e;
+                    }
+                } finally {
+                    allocator = null;
+                }
             }
-            LOG.info("MaxComputeJniWriter closed: writeSessionId=" + 
writeSessionId
-                    + ", partitionSpec=" + partitionSpec
-                    + ", writtenRows=" + writtenRows
-                    + ", blockId=" + blockId);
-        } catch (Exception e) {
-            String errorMsg = "Failed to close MaxCompute arrow writer";
-            LOG.error(errorMsg, e);
-            throw new IOException(errorMsg, e);
+        }
+        LOG.info("MaxComputeJniWriter closed: writeSessionId=" + writeSessionId
+                + ", partitionSpec=" + partitionSpec
+                + ", writtenRows=" + writtenRows
+                + ", totalSegments=" + commitMessages.size()
+                + ", blockId=" + blockId);
+        if (firstException != null) {
+            throw new IOException("Failed to close MaxCompute arrow writer", 
firstException);
         }
     }
 
@@ -605,16 +975,18 @@ public class MaxComputeJniWriter extends JniWriter {
         Map<String, String> stats = new HashMap<>();
         stats.put("mc_partition_spec", partitionSpec != null ? partitionSpec : 
"");
 
-        // Serialize WriterCommitMessage to Base64
-        if (commitMessage != null) {
+        // Serialize all WriterCommitMessages (one per segment) as a List 
object.
+        if (!commitMessages.isEmpty()) {
             try {
                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 ObjectOutputStream oos = new ObjectOutputStream(baos);
-                oos.writeObject(commitMessage);
+                // Serialize the entire list as one object to avoid mixing
+                // writeInt/writeObject which causes OptionalDataException
+                oos.writeObject(new java.util.ArrayList<>(commitMessages));
                 oos.close();
                 stats.put("mc_commit_message", 
Base64.getEncoder().encodeToString(baos.toByteArray()));
             } catch (IOException e) {
-                LOG.error("Failed to serialize WriterCommitMessage", e);
+                LOG.error("Failed to serialize WriterCommitMessages", e);
             }
         }
 
@@ -625,3 +997,4 @@ public class MaxComputeJniWriter extends JniWriter {
         return stats;
     }
 }
+
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
index 627f3bc03e2..9ca45b19ea8 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
@@ -57,6 +57,12 @@ public class MCProperties {
     public static final String DEFAULT_READ_TIMEOUT = "120"; // 120s
     public static final String DEFAULT_RETRY_COUNT = "4"; // 4 times
 
+    public static final String MAX_FIELD_SIZE = "mc.max_field_size_bytes";
+    public static final String DEFAULT_MAX_FIELD_SIZE = "8388608"; // 8 * 1024 
* 1024 = 8MB
+
+    public static final String MAX_WRITE_BATCH_ROWS = 
"mc.max_write_batch_rows";
+    public static final String DEFAULT_MAX_WRITE_BATCH_ROWS = "4096";
+
     //withCrossPartition(true):
     //      Very friendly to scenarios where there are many partitions but 
each partition is very small.
     //withCrossPartition(false):
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
index 871312c476c..77200d47cec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
@@ -95,6 +95,7 @@ public class MCTransaction implements Transaction {
             TableWriteSessionBuilder builder = new TableWriteSessionBuilder()
                     .identifier(tableId)
                     .withSettings(catalog.getSettings())
+                    .withMaxFieldSize(catalog.getMaxFieldSize())
                     .withArrowOptions(ArrowOptions.newBuilder()
                             .withDatetimeUnit(TimestampUnit.MILLI)
                             .withTimestampUnit(TimestampUnit.MILLI)
@@ -136,9 +137,13 @@ public class MCTransaction implements Transaction {
                         byte[] bytes = 
Base64.getDecoder().decode(data.getCommitMessage());
                         ByteArrayInputStream bais = new 
ByteArrayInputStream(bytes);
                         ObjectInputStream ois = new ObjectInputStream(bais);
-                        WriterCommitMessage msg = (WriterCommitMessage) 
ois.readObject();
+                        // Deserialized as List<WriterCommitMessage> — 
supports segmented
+                        // commit where one writer produces multiple commit 
messages
+                        @SuppressWarnings("unchecked")
+                        List<WriterCommitMessage> msgs =
+                                (List<WriterCommitMessage>) ois.readObject();
+                        allMessages.addAll(msgs);
                         ois.close();
-                        allMessages.add(msg);
                     }
                 }
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
index 73eb62f5dc2..b3a15355b14 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
@@ -69,6 +69,8 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
     private int connectTimeout;
     private int readTimeout;
     private int retryTimes;
+    private long maxFieldSize;
+    private int maxWriteBatchRows;
 
     public boolean dateTimePredicatePushDown;
 
@@ -191,6 +193,10 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
                 props.getOrDefault(MCProperties.READ_TIMEOUT, 
MCProperties.DEFAULT_READ_TIMEOUT));
         retryTimes = Integer.parseInt(
                 props.getOrDefault(MCProperties.RETRY_COUNT, 
MCProperties.DEFAULT_RETRY_COUNT));
+        maxFieldSize = Long.parseLong(
+                props.getOrDefault(MCProperties.MAX_FIELD_SIZE, 
MCProperties.DEFAULT_MAX_FIELD_SIZE));
+        maxWriteBatchRows = Integer.parseInt(
+                props.getOrDefault(MCProperties.MAX_WRITE_BATCH_ROWS, 
MCProperties.DEFAULT_MAX_WRITE_BATCH_ROWS));
 
         RestOptions restOptions = RestOptions.newBuilder()
                 .withConnectTimeout(connectTimeout)
@@ -320,6 +326,16 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
         return readTimeout;
     }
 
+    public long getMaxFieldSize() {
+        makeSureInitialized();
+        return maxFieldSize;
+    }
+
+    public int getMaxWriteBatchRows() {
+        makeSureInitialized();
+        return maxWriteBatchRows;
+    }
+
     public boolean getDateTimePredicatePushDown() {
         return dateTimePredicatePushDown;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalMaxComputeTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalMaxComputeTableSink.java
index db9d7aec00b..c02a2553e79 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalMaxComputeTableSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalMaxComputeTableSink.java
@@ -114,6 +114,19 @@ public class PhysicalMaxComputeTableSink<CHILD_TYPE 
extends Plan> extends Physic
                 .map(Column::getName)
                 .collect(Collectors.toSet());
         if (!partitionNames.isEmpty()) {
+            // Check if any partition column is present in cols (the bound 
columns from SELECT).
+            // Static partition columns are excluded from cols by 
BindSink.bindMaxComputeTableSink(),
+            // so if no partition column remains in cols, all partitions are 
statically specified
+            // and we don't need sort/shuffle — all data goes to a single 
known partition.
+            Set<String> colNames = cols.stream()
+                    .map(Column::getName)
+                    .collect(Collectors.toSet());
+            boolean hasDynamicPartition = 
partitionNames.stream().anyMatch(colNames::contains);
+            if (!hasDynamicPartition) {
+                // All partition columns are statically specified, no sort 
needed
+                return PhysicalProperties.SINK_RANDOM_PARTITIONED;
+            }
+
             List<Integer> columnIdx = new ArrayList<>();
             List<Column> fullSchema = targetTable.getFullSchema();
             for (int i = 0; i < fullSchema.size(); i++) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java
index fdb50245a8e..bf52e894628 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java
@@ -74,6 +74,7 @@ public class MaxComputeTableSink extends 
BaseExternalTableDataSink {
         tSink.setConnectTimeout(catalog.getConnectTimeout());
         tSink.setReadTimeout(catalog.getReadTimeout());
         tSink.setRetryCount(catalog.getRetryTimes());
+        tSink.setMaxWriteBatchRows(catalog.getMaxWriteBatchRows());
 
         // Partition columns
         List<String> partitionColumnNames = 
targetTable.getPartitionColumns().stream()
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 8add59d47af..02b064a4aa1 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -514,6 +514,7 @@ struct TMaxComputeTableSink {
     14: optional list<string> partition_columns  // partition column names for 
dynamic partition
     15: optional string write_session_id          // Storage API write session 
ID
     16: optional map<string, string> properties // contains authentication 
properties
+    17: optional i32 max_write_batch_rows          // max rows per Arrow batch 
for write
 }
 
 struct TDataSink {
diff --git 
a/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_large_data.out
 
b/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_large_data.out
index 50b2cb76282..4b1de864f61 100644
--- 
a/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_large_data.out
+++ 
b/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_large_data.out
@@ -44,3 +44,13 @@
 8      name_8  0.08    20250101        r3
 9      name_9  0.09    20250102        r1
 
+-- !props_count --
+2000
+
+-- !props_top5 --
+0      name_0
+1      name_1
+2      name_2
+3      name_3
+4      name_4
+
diff --git 
a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy
 
b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy
index 6897a01125c..d45389e9ee0 100644
--- 
a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy
+++ 
b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy
@@ -140,4 +140,53 @@ suite("test_mc_write_large_data", "p2,external") {
         sql """DROP TABLE IF EXISTS internal.${internal_db}.${internal_tb}"""
         sql """DROP DATABASE IF EXISTS internal.${internal_db}"""
     }
+
+    // Test: mc.max_write_batch_rows and mc.max_field_size_bytes catalog 
properties
+    String mc_catalog_props = "test_mc_write_large_data_props"
+    sql """drop catalog if exists ${mc_catalog_props}"""
+    sql """
+    CREATE CATALOG IF NOT EXISTS ${mc_catalog_props} PROPERTIES (
+        "type" = "max_compute",
+        "mc.default.project" = "${defaultProject}",
+        "mc.access_key" = "${ak}",
+        "mc.secret_key" = "${sk}",
+        "mc.endpoint" = 
"http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api";,
+        "mc.quota" = "pay-as-you-go",
+        "mc.enable.namespace.schema" = "true",
+        "mc.max_write_batch_rows" = "512",
+        "mc.max_field_size_bytes" = "4194304"
+    );
+    """
+
+    sql """switch ${mc_catalog_props}"""
+    String db_props = "mc_props_test_${uuid}"
+    sql """drop database if exists ${db_props}"""
+    sql """create database ${db_props}"""
+    sql """use ${db_props}"""
+
+    try {
+        String tb_props = "props_verify_${uuid}"
+        sql """DROP TABLE IF EXISTS ${tb_props}"""
+        sql """
+        CREATE TABLE ${tb_props} (
+            id INT,
+            name STRING
+        )
+        """
+
+        // Insert 2000 rows to exceed max_write_batch_rows=512 (will be split 
into 4 batches)
+        sql """
+        INSERT INTO ${tb_props}
+        SELECT
+            number AS id,
+            concat('name_', cast(number AS STRING)) AS name
+        FROM numbers("number"="2000")
+        """
+
+        qt_props_count """ SELECT count(*) FROM ${tb_props} """
+        order_qt_props_top5 """ SELECT * FROM ${tb_props} ORDER BY id LIMIT 5 
"""
+    } finally {
+        sql """drop database if exists ${mc_catalog_props}.${db_props}"""
+        sql """drop catalog if exists ${mc_catalog_props}"""
+    }
 }
diff --git 
a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy
 
b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy
index c4a06d8634e..0e4aafe7389 100644
--- 
a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy
+++ 
b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy
@@ -66,6 +66,17 @@ suite("test_mc_write_static_partitions", "p2,external") {
         order_qt_static_single_p1 """ SELECT * FROM ${tb1} WHERE ds = 
'20250101' """
         order_qt_static_single_p2 """ SELECT * FROM ${tb1} WHERE ds = 
'20250102' """
 
+        // Explain: static partition INSERT should NOT have SORT node
+        explain {
+            sql("INSERT INTO ${tb1} PARTITION(ds='20250103') VALUES (4, 'd')")
+            notContains "SORT"
+        }
+        // Explain: static partition INSERT OVERWRITE should NOT have SORT node
+        explain {
+            sql("INSERT OVERWRITE TABLE ${tb1} PARTITION(ds='20250101') VALUES 
(5, 'e')")
+            notContains "SORT"
+        }
+
         // Test 2: Multi-level partition columns static partition INSERT INTO
         String tb2 = "static_multi_${uuid}"
         sql """DROP TABLE IF EXISTS ${tb2}"""
@@ -82,6 +93,12 @@ suite("test_mc_write_static_partitions", "p2,external") {
         order_qt_static_multi_all """ SELECT * FROM ${tb2} """
         order_qt_static_multi_bj """ SELECT * FROM ${tb2} WHERE region = 'bj' 
"""
 
+        // Explain: all partition columns statically specified should NOT have 
SORT node
+        explain {
+            sql("INSERT INTO ${tb2} PARTITION(ds='20250102', region='gz') 
VALUES (4, 'v4')")
+            notContains "SORT"
+        }
+
         test {
             sql """ INSERT INTO ${tb2} PARTITION(ds='20250101', region='bj', 
ds='20250102') VALUES (1, 'v1'), (2, 'v2');"""
             exception "Duplicate partition column: ds"
@@ -111,6 +128,12 @@ suite("test_mc_write_static_partitions", "p2,external") {
         sql """INSERT INTO ${tb3_dst} PARTITION(ds='20250201') SELECT id, name 
FROM ${tb3_src}"""
         order_qt_static_select """ SELECT * FROM ${tb3_dst} """
 
+        // Explain: static partition INSERT INTO SELECT should NOT have SORT 
node
+        explain {
+            sql("INSERT INTO ${tb3_dst} PARTITION(ds='20250202') SELECT id, 
name FROM ${tb3_src}")
+            notContains "SORT"
+        }
+
         // Test 4: INSERT OVERWRITE static partition
         String tb4 = "overwrite_part_${uuid}"
         sql """DROP TABLE IF EXISTS ${tb4}"""
@@ -129,6 +152,7 @@ suite("test_mc_write_static_partitions", "p2,external") {
         order_qt_overwrite_p2 """ SELECT * FROM ${tb4} WHERE ds = '20250102' 
"""
 
         // Test 5: Dynamic partition regression (ensure not broken)
+        // Dynamic partition: partition column 'ds' is in the data, SORT node 
IS expected
         String tb5 = "dynamic_reg_${uuid}"
         sql """DROP TABLE IF EXISTS ${tb5}"""
         sql """
@@ -141,6 +165,17 @@ suite("test_mc_write_static_partitions", "p2,external") {
         sql """INSERT INTO ${tb5} VALUES (1, 'a', '20250101'), (2, 'b', 
'20250102')"""
         order_qt_dynamic_regression """ SELECT * FROM ${tb5} """
 
+        // Explain: dynamic partition INSERT should HAVE SORT node (partition 
col in data)
+        explain {
+            sql("INSERT INTO ${tb5} VALUES (3, 'c', '20250103')")
+            contains "SORT"
+        }
+        // Explain: dynamic partition INSERT INTO SELECT should HAVE SORT node
+        explain {
+            sql("INSERT INTO ${tb5} SELECT * FROM ${tb3_src}")
+            contains "SORT"
+        }
+
         // Test 6: INSERT OVERWRITE non-partitioned table
         String tb6 = "overwrite_nopart_${uuid}"
         sql """DROP TABLE IF EXISTS ${tb6}"""
@@ -153,6 +188,41 @@ suite("test_mc_write_static_partitions", "p2,external") {
         sql """INSERT INTO ${tb6} VALUES (1, 'old')"""
         sql """INSERT OVERWRITE TABLE ${tb6} VALUES (2, 'new')"""
         order_qt_overwrite_no_part """ SELECT * FROM ${tb6} """
+
+        // Explain: non-partitioned table INSERT should NOT have SORT node
+        explain {
+            sql("INSERT INTO ${tb6} VALUES (3, 'val')")
+            notContains "SORT"
+        }
+        // Explain: non-partitioned table INSERT OVERWRITE should NOT have 
SORT node
+        explain {
+            sql("INSERT OVERWRITE TABLE ${tb6} VALUES (4, 'val2')")
+            notContains "SORT"
+        }
+
+        // Test 7: Multi-level partition with partial static (only some 
partition cols specified)
+        // When only some partition columns are statically specified (partial 
static),
+        // the remaining partition columns are dynamic, so SORT node IS 
expected
+        String tb7 = "partial_static_${uuid}"
+        sql """DROP TABLE IF EXISTS ${tb7}"""
+        sql """
+        CREATE TABLE ${tb7} (
+            id INT,
+            val STRING,
+            ds STRING,
+            region STRING
+        ) PARTITION BY (ds, region)()
+        """
+        // Explain: partial static partition (ds static, region dynamic) 
should HAVE SORT node
+        explain {
+            sql("INSERT INTO ${tb7} PARTITION(ds='20250101') VALUES (1, 'v1', 
'bj')")
+            contains "SORT"
+        }
+        // Explain: all static partition should NOT have SORT node
+        explain {
+            sql("INSERT INTO ${tb7} PARTITION(ds='20250101', region='bj') 
VALUES (1, 'v1')")
+            notContains "SORT"
+        }
     } finally {
         sql """drop database if exists ${mc_catalog_name}.${db}"""
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to