This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 6f5ef0a4da5 [fix](mc) fix memory leak and optimize large data write 
for MaxCompute connector (#61245) (#64449)
6f5ef0a4da5 is described below

commit 6f5ef0a4da5579ae25dfecc9bf556ffefb00f61e
Author: daidai <[email protected]>
AuthorDate: Fri Jun 12 22:41:24 2026 +0800

    [fix](mc) fix memory leak and optimize large data write for MaxCompute 
connector (#61245) (#64449)
    
    pick #61245
    Fix:
    - Fix potential memory leak in MaxComputeJniScanner by closing
    currentSplitReader in close().
    
    Optimization:
    - mc.max_field_size_bytes: max field size in bytes for write session
    (default: 8MB)
---
 .../doris/maxcompute/MaxComputeJniScanner.java     | 164 ++++++++++++++++++---
 .../doris/common/maxcompute/MCProperties.java      |   3 +
 .../maxcompute/MaxComputeExternalCatalog.java      |   8 +
 3 files changed, 158 insertions(+), 17 deletions(-)

diff --git 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index 0b196fcb9cd..336991f3802 100644
--- 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++ 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -33,6 +33,7 @@ import com.aliyun.odps.table.read.split.InputSplit;
 import com.aliyun.odps.table.read.split.impl.IndexedInputSplit;
 import com.aliyun.odps.table.read.split.impl.RowRangeInputSplit;
 import com.google.common.base.Strings;
+import org.apache.arrow.vector.BaseVariableWidthVector;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.log4j.Logger;
@@ -60,6 +61,13 @@ public class MaxComputeJniScanner extends JniScanner {
 
     private static final Logger LOG = 
Logger.getLogger(MaxComputeJniScanner.class);
 
+    // 256MB byte budget per scanner batch — limits the C++ Block size at the 
source.
+    // With large rows (e.g. 585KB/row STRING), batch_size=4096 would create 
~2.4GB Blocks.
+    // The pipeline's AsyncResultWriter queues up to 3 Blocks per instance, 
and with
+    // parallel_pipeline_task_num instances, total queue memory = instances * 
3 * block_size.
+    // 256MB keeps queue memory manageable: 5 instances * 3 * 256MB = 3.8GB.
+    private static final long MAX_BATCH_BYTES = 256 * 1024 * 1024L;
+
     private static final String ACCESS_KEY = "access_key";
     private static final String SECRET_KEY = "secret_key";
     private static final String ENDPOINT = "endpoint";
@@ -86,10 +94,12 @@ public class MaxComputeJniScanner extends JniScanner {
     private TableBatchReadSession scan;
     public  String sessionId;
 
-    private String project; //final ???
+    private String project;
     private String table;
 
     private SplitReader<VectorSchemaRoot> currentSplitReader;
+    private VectorSchemaRoot currentBatch = null;
+    private int currentBatchRowOffset = 0;
     private MaxComputeColumnValue columnValue;
 
     private Map<String, Integer> readColumnsToId;
@@ -215,8 +225,17 @@ public class MaxComputeJniScanner extends JniScanner {
 
     @Override
     public void close() throws IOException {
+        if (currentSplitReader != null) {
+            try {
+                currentSplitReader.close();
+            } catch (Exception e) {
+                LOG.warn("Failed to close MaxCompute split reader for table " 
+ project + "." + table, e);
+            }
+        }
         startOffset = -1;
         splitSize = -1;
+        currentBatch = null;
+        currentBatchRowOffset = 0;
         currentSplitReader = null;
         settings = null;
         scan = null;
@@ -234,45 +253,74 @@ public class MaxComputeJniScanner extends JniScanner {
         return readVectors(expectedRows);
     }
 
+    private VectorSchemaRoot getNextBatch() throws IOException {
+        try {
+            if (!currentSplitReader.hasNext()) {
+                currentSplitReader.close();
+                currentSplitReader = null;
+                return null;
+            }
+            return currentSplitReader.get();
+        } catch (Exception e) {
+            String errorMsg = "MaxComputeJniScanner readVectors get batch 
fail";
+            LOG.warn(errorMsg, e);
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+
     private int readVectors(int expectedRows) throws IOException {
         int curReadRows = 0;
+        long accumulatedBytes = 0;
         while (curReadRows < expectedRows) {
-            try {
-                if (!currentSplitReader.hasNext()) {
-                    currentSplitReader.close();
-                    currentSplitReader = null;
+            // Stop early if accumulated variable-width bytes approach int32 
limit
+            if (accumulatedBytes >= MAX_BATCH_BYTES) {
+                break;
+            }
+            if (currentBatch == null) {
+                currentBatch = getNextBatch();
+                if (currentBatch == null || currentBatch.getRowCount() == 0) {
+                    currentBatch = null;
                     break;
                 }
-            } catch (Exception e) {
-                String errorMsg = "MaxComputeJniScanner readVectors hasNext 
fail";
-                LOG.warn(errorMsg, e);
-                throw new IOException(e.getMessage(), e);
+                currentBatchRowOffset = 0;
             }
-
             try {
-                VectorSchemaRoot data = currentSplitReader.get();
-                if (data.getRowCount() == 0) {
+                int rowsToAppend = Math.min(expectedRows - curReadRows,
+                        currentBatch.getRowCount() - currentBatchRowOffset);
+                List<FieldVector> fieldVectors = 
currentBatch.getFieldVectors();
+
+                // Limit rows to avoid int32 overflow in VectorColumn's String 
byte buffer
+                rowsToAppend = limitRowsByVarWidthBytes(
+                        fieldVectors, currentBatchRowOffset, rowsToAppend,
+                        MAX_BATCH_BYTES - accumulatedBytes);
+                if (rowsToAppend <= 0) {
                     break;
                 }
 
-                List<FieldVector> fieldVectors = data.getFieldVectors();
-                int batchRows = 0;
                 long startTime = System.nanoTime();
                 for (FieldVector column : fieldVectors) {
                     Integer readColumnId = 
readColumnsToId.get(column.getName());
-                    batchRows = column.getValueCount();
                     if (readColumnId == null) {
                         continue;
                     }
                     columnValue.reset(column);
-                    for (int j = 0; j < batchRows; j++) {
+                    for (int j = currentBatchRowOffset; j < 
currentBatchRowOffset + rowsToAppend; j++) {
                         columnValue.setColumnIdx(j);
                         appendData(readColumnId, columnValue);
                     }
                 }
                 appendDataTime += System.nanoTime() - startTime;
 
-                curReadRows += batchRows;
+                // Track bytes for the rows just appended
+                accumulatedBytes += estimateVarWidthBytes(
+                        fieldVectors, currentBatchRowOffset, rowsToAppend);
+
+                currentBatchRowOffset += rowsToAppend;
+                curReadRows += rowsToAppend;
+                if (currentBatchRowOffset >= currentBatch.getRowCount()) {
+                    currentBatch = null;
+                    currentBatchRowOffset = 0;
+                }
             } catch (Exception e) {
                 String errorMsg = String.format("MaxComputeJniScanner Fail to 
read arrow data. "
                         + "curReadRows = {}, expectedRows = {}", curReadRows, 
expectedRows);
@@ -280,9 +328,91 @@ public class MaxComputeJniScanner extends JniScanner {
                 throw new RuntimeException(errorMsg, e);
             }
         }
+        if (LOG.isDebugEnabled() && curReadRows > 0 && curReadRows < 
expectedRows) {
+            LOG.debug("readVectors: returning " + curReadRows + " rows 
(limited by byte budget)"
+                    + ", totalVarWidthBytes=" + accumulatedBytes
+                    + ", expectedRows=" + expectedRows);
+        }
         return curReadRows;
     }
 
+    /**
+     * Limit the number of rows to append so that no single variable-width 
column
+     * exceeds the remaining byte budget. This prevents int32 overflow in
+     * VectorColumn's appendIndex for String/Binary child byte arrays.
+     *
+     * Uses Arrow's offset buffer for O(1)-per-row byte size calculation —
+     * no data copying involved.
+     */
+    private int limitRowsByVarWidthBytes(List<FieldVector> fieldVectors,
+            int offset, int maxRows, long remainingBudget) {
+        if (remainingBudget <= 0) {
+            return 0;
+        }
+        int safeRows = maxRows;
+        for (FieldVector fv : fieldVectors) {
+            if (fv instanceof BaseVariableWidthVector) {
+                BaseVariableWidthVector vec = (BaseVariableWidthVector) fv;
+                // Find how many rows fit within the budget for THIS column
+                int rows = findMaxRowsWithinBudget(vec, offset, maxRows, 
remainingBudget);
+                safeRows = Math.min(safeRows, rows);
+            }
+        }
+        // Always allow at least 1 row to make progress, even if it exceeds 
budget
+        return Math.max(1, safeRows);
+    }
+
+    /**
+     * Binary search for the maximum number of rows starting at 'offset'
+     * whose total bytes in the variable-width vector fit within 'budget'.
+     */
+    private int findMaxRowsWithinBudget(BaseVariableWidthVector vec,
+            int offset, int maxRows, long budget) {
+        if (maxRows <= 0) {
+            return 0;
+        }
+        // Total bytes for all maxRows
+        long totalBytes = (long) vec.getOffsetBuffer().getInt((long) (offset + 
maxRows) * 4)
+                - (long) vec.getOffsetBuffer().getInt((long) offset * 4);
+        if (totalBytes <= budget) {
+            return maxRows;
+        }
+        // Binary search for the cutoff point
+        int lo = 1;
+        int hi = maxRows - 1;
+        int startOff = vec.getOffsetBuffer().getInt((long) offset * 4);
+        while (lo <= hi) {
+            int mid = lo + (hi - lo) / 2;
+            long bytes = (long) vec.getOffsetBuffer().getInt((long) (offset + 
mid) * 4) - startOff;
+            if (bytes <= budget) {
+                lo = mid + 1;
+            } else {
+                hi = mid - 1;
+            }
+        }
+        // 'hi' is the largest count whose bytes <= budget (could be 0)
+        return hi;
+    }
+
+    /**
+     * Estimate total variable-width bytes for the given row range across all 
columns.
+     * Returns the max bytes of any single column (since each column has its 
own
+     * VectorColumn child buffer and the overflow is per-column).
+     */
+    private long estimateVarWidthBytes(List<FieldVector> fieldVectors,
+            int offset, int rows) {
+        long maxColumnBytes = 0;
+        for (FieldVector fv : fieldVectors) {
+            if (fv instanceof BaseVariableWidthVector) {
+                BaseVariableWidthVector vec = (BaseVariableWidthVector) fv;
+                long bytes = (long) vec.getOffsetBuffer().getInt((long) 
(offset + rows) * 4)
+                        - (long) vec.getOffsetBuffer().getInt((long) offset * 
4);
+                maxColumnBytes = Math.max(maxColumnBytes, bytes);
+            }
+        }
+        return maxColumnBytes;
+    }
+
     private static Object deserialize(String serializedString) throws 
IOException, ClassNotFoundException {
         byte[] serializedBytes = Base64.getDecoder().decode(serializedString);
         ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(serializedBytes);
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
index 627f3bc03e2..9cdf67bba7f 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
@@ -57,6 +57,9 @@ public class MCProperties {
     public static final String DEFAULT_READ_TIMEOUT = "120"; // 120s
     public static final String DEFAULT_RETRY_COUNT = "4"; // 4 times
 
+    public static final String MAX_FIELD_SIZE = "mc.max_field_size_bytes";
+    public static final String DEFAULT_MAX_FIELD_SIZE = "8388608"; // 8 * 1024 
* 1024 = 8MB
+
     //withCrossPartition(true):
     //      Very friendly to scenarios where there are many partitions but 
each partition is very small.
     //withCrossPartition(false):
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
index edf339e6702..f03fce35fd7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
@@ -67,6 +67,7 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
     private int connectTimeout;
     private int readTimeout;
     private int retryTimes;
+    private long maxFieldSize;
 
     public boolean dateTimePredicatePushDown;
 
@@ -189,6 +190,8 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
                 props.getOrDefault(MCProperties.READ_TIMEOUT, 
MCProperties.DEFAULT_READ_TIMEOUT));
         retryTimes = Integer.parseInt(
                 props.getOrDefault(MCProperties.RETRY_COUNT, 
MCProperties.DEFAULT_RETRY_COUNT));
+        maxFieldSize = Long.parseLong(
+                props.getOrDefault(MCProperties.MAX_FIELD_SIZE, 
MCProperties.DEFAULT_MAX_FIELD_SIZE));
 
         RestOptions restOptions = RestOptions.newBuilder()
                 .withConnectTimeout(connectTimeout)
@@ -310,6 +313,11 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
         return readTimeout;
     }
 
+    public long getMaxFieldSize() {
+        makeSureInitialized();
+        return maxFieldSize;
+    }
+
     public boolean getDateTimePredicatePushDown() {
         return dateTimePredicatePushDown;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to