This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a78c1d98b90 [fix](mc) fix memory leak and optimize large data write
for MaxCompute connector (#61245)
a78c1d98b90 is described below
commit a78c1d98b904706925ed8e3f8dcbd252e5c10431
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sun Mar 22 16:59:12 2026 -0700
[fix](mc) fix memory leak and optimize large data write for MaxCompute
connector (#61245)
### What problem does this PR solve?
Fix:
- Fix potential memory leak in MaxComputeJniScanner by closing
currentSplitReader in close().
- Fix potential memory leak in MaxComputeJniWriter by restructuring
close() with try-finally to ensure allocator is always closed even
when batchWriter.commit() throws. Also close VectorSchemaRoot after
each batch write.
- Fix maxWriteBatchRows parameter key mismatch between BE
("max_write_batch_rows") and JNI ("mc.max_write_batch_rows"),
which caused user-customized values to be silently ignored.
Optimization:
- Split large Arrow batches into smaller chunks (controlled by
mc.max_write_batch_rows, default 4096) to avoid HTTP 413 Request
Entity Too Large errors from MaxCompute Storage API.
- Skip unnecessary SORT node for static partition INSERT, since all
data goes to a single known partition and no dynamic routing is
needed.
- Enable ZSTD compression for Arrow data transfer to reduce network
bandwidth.
New catalog properties:
- mc.max_write_batch_rows: max rows per Arrow batch for write
(default: 4096)
- mc.max_field_size_bytes: max field size in bytes for write session
(default: 8MB)
Co-authored-by: daidai <[email protected]>
---
.../sink/writer/maxcompute/vmc_table_writer.cpp | 43 +-
.../exec/sink/writer/maxcompute/vmc_table_writer.h | 6 +
.../doris/maxcompute/MaxComputeJniScanner.java | 162 ++++++-
.../doris/maxcompute/MaxComputeJniWriter.java | 521 ++++++++++++++++++---
.../doris/common/maxcompute/MCProperties.java | 6 +
.../doris/datasource/maxcompute/MCTransaction.java | 9 +-
.../maxcompute/MaxComputeExternalCatalog.java | 16 +
.../physical/PhysicalMaxComputeTableSink.java | 13 +
.../apache/doris/planner/MaxComputeTableSink.java | 1 +
gensrc/thrift/DataSinks.thrift | 1 +
.../maxcompute/write/test_mc_write_large_data.out | 10 +
.../write/test_mc_write_large_data.groovy | 49 ++
.../write/test_mc_write_static_partitions.groovy | 70 +++
13 files changed, 810 insertions(+), 97 deletions(-)
diff --git a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
index 323bbfbe231..3abb836d87d 100644
--- a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
+++ b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
@@ -117,6 +117,9 @@ std::map<std::string, std::string>
VMCTableWriter::_build_base_writer_params() {
if (_mc_sink.__isset.retry_count) {
params["retry_count"] = std::to_string(_mc_sink.retry_count);
}
+ if (_mc_sink.__isset.max_write_batch_rows) {
+ params["max_write_batch_rows"] =
std::to_string(_mc_sink.max_write_batch_rows);
+ }
return params;
}
@@ -156,13 +159,10 @@ Status VMCTableWriter::write(RuntimeState* state, Block&
block) {
it = _partitions_to_writers.find(_static_partition_spec);
}
output_block.erase(_non_write_columns_indices);
- return it->second->write(output_block);
+ return _write_block_in_chunks(it->second, output_block);
}
// Case 2: Dynamic partition or non-partitioned table
- // For dynamic partitions, MaxCompute Storage API (with
DynamicPartitionOptions) expects
- // partition column values in the Arrow data and handles routing
internally.
- // So we send the full block including partition columns to a single
writer.
std::string partition_key = "";
auto it = _partitions_to_writers.find(partition_key);
if (it == _partitions_to_writers.end()) {
@@ -171,7 +171,40 @@ Status VMCTableWriter::write(RuntimeState* state, Block&
block) {
_partitions_to_writers.insert({partition_key, writer});
it = _partitions_to_writers.find(partition_key);
}
- return it->second->write(output_block);
+ return _write_block_in_chunks(it->second, output_block);
+}
+
+Status VMCTableWriter::_write_block_in_chunks(const
std::shared_ptr<VMCPartitionWriter>& writer,
+ Block& output_block) {
+ // Limit per-JNI data to MAX_WRITE_BLOCK_BYTES. When data source is not MC
scanner
+ // (e.g. Doris internal table, Hive, JDBC), the upstream batch_size
controls Block
+ // row count but not byte size. With large rows (585KB/row), a 4096-row
Block is
+ // ~2.4GB. Splitting ensures each JNI call processes bounded data,
limiting Arrow
+ // and SDK native memory per call.
+ static constexpr size_t MAX_WRITE_BLOCK_BYTES = 256 * 1024 * 1024; // 256MB
+
+ const size_t block_bytes = output_block.allocated_bytes();
+ const size_t rows = output_block.rows();
+
+ if (block_bytes <= MAX_WRITE_BLOCK_BYTES || rows <= 1) {
+ return writer->write(output_block);
+ }
+
+ const size_t bytes_per_row = block_bytes / rows;
+ const size_t max_rows = std::max(size_t(1), MAX_WRITE_BLOCK_BYTES /
bytes_per_row);
+
+ for (size_t offset = 0; offset < rows; offset += max_rows) {
+ const size_t num_rows = std::min(max_rows, rows - offset);
+ Block sub_block = output_block.clone_empty();
+ auto columns = sub_block.mutate_columns();
+ for (size_t i = 0; i < columns.size(); i++) {
+
columns[i]->insert_range_from(*output_block.get_by_position(i).column, offset,
+ num_rows);
+ }
+ sub_block.set_columns(std::move(columns));
+ RETURN_IF_ERROR(writer->write(sub_block));
+ }
+ return Status::OK();
}
Status VMCTableWriter::close(Status status) {
diff --git a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.h
b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.h
index 24075e47a06..643b28f1cc0 100644
--- a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.h
+++ b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.h
@@ -54,6 +54,12 @@ public:
private:
std::shared_ptr<VMCPartitionWriter> _create_partition_writer(const
std::string& partition_spec);
+ // Split large blocks into sub-blocks before JNI to limit Arrow and SDK
+ // native memory. Needed when data source is not MC scanner and blocks
+ // may exceed 256MB (e.g. batch_size=4096 with 585KB/row = 2.4GB).
+ Status _write_block_in_chunks(const std::shared_ptr<VMCPartitionWriter>&
writer,
+ Block& output_block);
+
std::map<std::string, std::string> _build_base_writer_params();
TDataSink _t_sink;
diff --git
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index ce1b3941dca..336991f3802 100644
---
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -33,6 +33,7 @@ import com.aliyun.odps.table.read.split.InputSplit;
import com.aliyun.odps.table.read.split.impl.IndexedInputSplit;
import com.aliyun.odps.table.read.split.impl.RowRangeInputSplit;
import com.google.common.base.Strings;
+import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.log4j.Logger;
@@ -60,6 +61,13 @@ public class MaxComputeJniScanner extends JniScanner {
private static final Logger LOG =
Logger.getLogger(MaxComputeJniScanner.class);
+ // 256MB byte budget per scanner batch — limits the C++ Block size at the
source.
+ // With large rows (e.g. 585KB/row STRING), batch_size=4096 would create
~2.4GB Blocks.
+ // The pipeline's AsyncResultWriter queues up to 3 Blocks per instance,
and with
+ // parallel_pipeline_task_num instances, total queue memory = instances *
3 * block_size.
+ // 256MB keeps queue memory manageable: 5 instances * 3 * 256MB = 3.8GB.
+ private static final long MAX_BATCH_BYTES = 256 * 1024 * 1024L;
+
private static final String ACCESS_KEY = "access_key";
private static final String SECRET_KEY = "secret_key";
private static final String ENDPOINT = "endpoint";
@@ -90,6 +98,8 @@ public class MaxComputeJniScanner extends JniScanner {
private String table;
private SplitReader<VectorSchemaRoot> currentSplitReader;
+ private VectorSchemaRoot currentBatch = null;
+ private int currentBatchRowOffset = 0;
private MaxComputeColumnValue columnValue;
private Map<String, Integer> readColumnsToId;
@@ -215,8 +225,17 @@ public class MaxComputeJniScanner extends JniScanner {
@Override
public void close() throws IOException {
+ if (currentSplitReader != null) {
+ try {
+ currentSplitReader.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close MaxCompute split reader for table "
+ project + "." + table, e);
+ }
+ }
startOffset = -1;
splitSize = -1;
+ currentBatch = null;
+ currentBatchRowOffset = 0;
currentSplitReader = null;
settings = null;
scan = null;
@@ -234,45 +253,74 @@ public class MaxComputeJniScanner extends JniScanner {
return readVectors(expectedRows);
}
+ private VectorSchemaRoot getNextBatch() throws IOException {
+ try {
+ if (!currentSplitReader.hasNext()) {
+ currentSplitReader.close();
+ currentSplitReader = null;
+ return null;
+ }
+ return currentSplitReader.get();
+ } catch (Exception e) {
+ String errorMsg = "MaxComputeJniScanner readVectors get batch
fail";
+ LOG.warn(errorMsg, e);
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
private int readVectors(int expectedRows) throws IOException {
int curReadRows = 0;
+ long accumulatedBytes = 0;
while (curReadRows < expectedRows) {
- try {
- if (!currentSplitReader.hasNext()) {
- currentSplitReader.close();
- currentSplitReader = null;
+ // Stop early if accumulated variable-width bytes approach int32
limit
+ if (accumulatedBytes >= MAX_BATCH_BYTES) {
+ break;
+ }
+ if (currentBatch == null) {
+ currentBatch = getNextBatch();
+ if (currentBatch == null || currentBatch.getRowCount() == 0) {
+ currentBatch = null;
break;
}
- } catch (Exception e) {
- String errorMsg = "MaxComputeJniScanner readVectors hasNext
fail";
- LOG.warn(errorMsg, e);
- throw new IOException(e.getMessage(), e);
+ currentBatchRowOffset = 0;
}
-
try {
- VectorSchemaRoot data = currentSplitReader.get();
- if (data.getRowCount() == 0) {
+ int rowsToAppend = Math.min(expectedRows - curReadRows,
+ currentBatch.getRowCount() - currentBatchRowOffset);
+ List<FieldVector> fieldVectors =
currentBatch.getFieldVectors();
+
+ // Limit rows to avoid int32 overflow in VectorColumn's String
byte buffer
+ rowsToAppend = limitRowsByVarWidthBytes(
+ fieldVectors, currentBatchRowOffset, rowsToAppend,
+ MAX_BATCH_BYTES - accumulatedBytes);
+ if (rowsToAppend <= 0) {
break;
}
- List<FieldVector> fieldVectors = data.getFieldVectors();
- int batchRows = 0;
long startTime = System.nanoTime();
for (FieldVector column : fieldVectors) {
Integer readColumnId =
readColumnsToId.get(column.getName());
- batchRows = column.getValueCount();
if (readColumnId == null) {
continue;
}
columnValue.reset(column);
- for (int j = 0; j < batchRows; j++) {
+ for (int j = currentBatchRowOffset; j <
currentBatchRowOffset + rowsToAppend; j++) {
columnValue.setColumnIdx(j);
appendData(readColumnId, columnValue);
}
}
appendDataTime += System.nanoTime() - startTime;
- curReadRows += batchRows;
+ // Track bytes for the rows just appended
+ accumulatedBytes += estimateVarWidthBytes(
+ fieldVectors, currentBatchRowOffset, rowsToAppend);
+
+ currentBatchRowOffset += rowsToAppend;
+ curReadRows += rowsToAppend;
+ if (currentBatchRowOffset >= currentBatch.getRowCount()) {
+ currentBatch = null;
+ currentBatchRowOffset = 0;
+ }
} catch (Exception e) {
String errorMsg = String.format("MaxComputeJniScanner Fail to
read arrow data. "
+ "curReadRows = {}, expectedRows = {}", curReadRows,
expectedRows);
@@ -280,9 +328,91 @@ public class MaxComputeJniScanner extends JniScanner {
throw new RuntimeException(errorMsg, e);
}
}
+ if (LOG.isDebugEnabled() && curReadRows > 0 && curReadRows <
expectedRows) {
+ LOG.debug("readVectors: returning " + curReadRows + " rows
(limited by byte budget)"
+ + ", totalVarWidthBytes=" + accumulatedBytes
+ + ", expectedRows=" + expectedRows);
+ }
return curReadRows;
}
+ /**
+ * Limit the number of rows to append so that no single variable-width
column
+ * exceeds the remaining byte budget. This prevents int32 overflow in
+ * VectorColumn's appendIndex for String/Binary child byte arrays.
+ *
+ * Uses Arrow's offset buffer for O(1)-per-row byte size calculation —
+ * no data copying involved.
+ */
+ private int limitRowsByVarWidthBytes(List<FieldVector> fieldVectors,
+ int offset, int maxRows, long remainingBudget) {
+ if (remainingBudget <= 0) {
+ return 0;
+ }
+ int safeRows = maxRows;
+ for (FieldVector fv : fieldVectors) {
+ if (fv instanceof BaseVariableWidthVector) {
+ BaseVariableWidthVector vec = (BaseVariableWidthVector) fv;
+ // Find how many rows fit within the budget for THIS column
+ int rows = findMaxRowsWithinBudget(vec, offset, maxRows,
remainingBudget);
+ safeRows = Math.min(safeRows, rows);
+ }
+ }
+ // Always allow at least 1 row to make progress, even if it exceeds
budget
+ return Math.max(1, safeRows);
+ }
+
+ /**
+ * Binary search for the maximum number of rows starting at 'offset'
+ * whose total bytes in the variable-width vector fit within 'budget'.
+ */
+ private int findMaxRowsWithinBudget(BaseVariableWidthVector vec,
+ int offset, int maxRows, long budget) {
+ if (maxRows <= 0) {
+ return 0;
+ }
+ // Total bytes for all maxRows
+ long totalBytes = (long) vec.getOffsetBuffer().getInt((long) (offset +
maxRows) * 4)
+ - (long) vec.getOffsetBuffer().getInt((long) offset * 4);
+ if (totalBytes <= budget) {
+ return maxRows;
+ }
+ // Binary search for the cutoff point
+ int lo = 1;
+ int hi = maxRows - 1;
+ int startOff = vec.getOffsetBuffer().getInt((long) offset * 4);
+ while (lo <= hi) {
+ int mid = lo + (hi - lo) / 2;
+ long bytes = (long) vec.getOffsetBuffer().getInt((long) (offset +
mid) * 4) - startOff;
+ if (bytes <= budget) {
+ lo = mid + 1;
+ } else {
+ hi = mid - 1;
+ }
+ }
+ // 'hi' is the largest count whose bytes <= budget (could be 0)
+ return hi;
+ }
+
+ /**
+ * Estimate total variable-width bytes for the given row range across all
columns.
+ * Returns the max bytes of any single column (since each column has its
own
+ * VectorColumn child buffer and the overflow is per-column).
+ */
+ private long estimateVarWidthBytes(List<FieldVector> fieldVectors,
+ int offset, int rows) {
+ long maxColumnBytes = 0;
+ for (FieldVector fv : fieldVectors) {
+ if (fv instanceof BaseVariableWidthVector) {
+ BaseVariableWidthVector vec = (BaseVariableWidthVector) fv;
+ long bytes = (long) vec.getOffsetBuffer().getInt((long)
(offset + rows) * 4)
+ - (long) vec.getOffsetBuffer().getInt((long) offset *
4);
+ maxColumnBytes = Math.max(maxColumnBytes, bytes);
+ }
+ }
+ return maxColumnBytes;
+ }
+
private static Object deserialize(String serializedString) throws
IOException, ClassNotFoundException {
byte[] serializedBytes = Base64.getDecoder().decode(serializedString);
ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(serializedBytes);
diff --git
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
index 86b849cd627..4e0f2570a82 100644
---
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
+++
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
@@ -18,6 +18,7 @@
package org.apache.doris.maxcompute;
import org.apache.doris.common.jni.JniWriter;
+import org.apache.doris.common.jni.vec.VectorColumn;
import org.apache.doris.common.jni.vec.VectorTable;
import org.apache.doris.common.maxcompute.MCUtils;
@@ -25,6 +26,7 @@ import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.table.configuration.ArrowOptions;
import com.aliyun.odps.table.configuration.ArrowOptions.TimestampUnit;
+import com.aliyun.odps.table.configuration.CompressionCodec;
import com.aliyun.odps.table.configuration.RestOptions;
import com.aliyun.odps.table.configuration.WriterOptions;
import com.aliyun.odps.table.enviroment.Credentials;
@@ -92,18 +94,35 @@ public class MaxComputeJniWriter extends JniWriter {
private static final String CONNECT_TIMEOUT = "connect_timeout";
private static final String READ_TIMEOUT = "read_timeout";
private static final String RETRY_COUNT = "retry_count";
+ private static final String MAX_WRITE_BATCH_ROWS = "max_write_batch_rows";
private final Map<String, String> params;
+
+ // 128MB batch threshold — controls peak Arrow native memory per batch.
+ // Arrow uses sun.misc.Unsafe.allocateMemory() which is invisible to JVM
metrics.
+ // Each batch temporarily holds ~batchDataBytes of native memory.
+ // With 3 concurrent writers, total Arrow native = 3 * 128MB = ~384MB.
+ // Using 1GB was too large: 3 writers * 1GB = 3GB invisible native memory.
+ private static final long MAX_ARROW_BATCH_BYTES = 128 * 1024 * 1024L;
+
+ // Segmented commit: commit and recreate batchWriter every N rows to
prevent
+ // MaxCompute SDK native memory accumulation. Without this, the SDK buffers
+ // all written data internally, causing process RSS to grow linearly with
+ // total data volume until SIGSEGV.
+ private static final long ROWS_PER_SEGMENT = 5000;
+
private final String endpoint;
private final String project;
private final String tableName;
private final String quota;
private String writeSessionId;
private long blockId;
+ private long nextBlockId; // For creating new segments with unique blockIds
private String partitionSpec;
private int connectTimeout;
private int readTimeout;
private int retryCount;
+ private int maxWriteBatchRows;
// Storage API objects
private TableBatchWriteSession writeSession;
@@ -111,7 +130,14 @@ public class MaxComputeJniWriter extends JniWriter {
private BufferAllocator allocator;
private List<TypeInfo> columnTypeInfos;
private List<String> columnNames;
- private WriterCommitMessage commitMessage;
+ // Collect commit messages from all segments (each batchWriter commit
produces one)
+ private final List<WriterCommitMessage> commitMessages = new
java.util.ArrayList<>();
+
+ // Per-segment row counter (resets after each segment commit)
+ private long segmentRows = 0;
+
+ // Writer options cached for creating new batchWriters
+ private WriterOptions writerOptions;
// Statistics
private long writtenRows = 0;
@@ -127,10 +153,12 @@ public class MaxComputeJniWriter extends JniWriter {
this.writeSessionId =
Objects.requireNonNull(params.get(WRITE_SESSION_ID),
"required property '" + WRITE_SESSION_ID + "'.");
this.blockId = Long.parseLong(params.getOrDefault(BLOCK_ID, "0"));
+ this.nextBlockId = this.blockId + 1; // Reserve blockId for first
writer, increment for segments
this.partitionSpec = params.getOrDefault(PARTITION_SPEC, "");
this.connectTimeout =
Integer.parseInt(params.getOrDefault(CONNECT_TIMEOUT, "10"));
this.readTimeout = Integer.parseInt(params.getOrDefault(READ_TIMEOUT,
"120"));
this.retryCount = Integer.parseInt(params.getOrDefault(RETRY_COUNT,
"4"));
+ this.maxWriteBatchRows =
Integer.parseInt(params.getOrDefault(MAX_WRITE_BATCH_ROWS, "4096"));
}
@Override
@@ -184,9 +212,10 @@ public class MaxComputeJniWriter extends JniWriter {
allocator = new RootAllocator(Long.MAX_VALUE);
- // Create Arrow writer for this block
- WriterOptions writerOptions = WriterOptions.newBuilder()
+ // Cache writer options for creating new batchWriters in segments
+ writerOptions = WriterOptions.newBuilder()
.withSettings(settings)
+ .withCompressionCodec(CompressionCodec.ZSTD)
.build();
batchWriter = writeSession.createArrowWriter(blockId,
WriterAttemptId.of(0), writerOptions);
@@ -210,19 +239,44 @@ public class MaxComputeJniWriter extends JniWriter {
}
try {
- Object[][] data = inputTable.getMaterializedData();
+ // Stream data directly from off-heap VectorColumn to Arrow
vectors.
+ // Unlike the previous getMaterializedData() approach that created
+ // Object[][] (with String objects for STRING columns causing 3x
memory
+ // amplification), this reads bytes directly from VectorColumn and
writes
+ // to Arrow, keeping peak heap usage per batch to O(batch_rows *
row_size)
+ // instead of O(2 * batch_rows * row_size).
+ int rowOffset = 0;
+ while (rowOffset < numRows) {
+ int batchRows = Math.min(maxWriteBatchRows, numRows -
rowOffset);
- // Get a pre-allocated VectorSchemaRoot from the batch writer
- VectorSchemaRoot root = batchWriter.newElement();
- root.setRowCount(numRows);
+ // For variable-width columns, check byte budget to avoid
Arrow int32 overflow
+ batchRows = limitWriteBatchByBytesStreaming(inputTable,
numCols,
+ rowOffset, batchRows);
- for (int col = 0; col < numCols && col < columnTypeInfos.size();
col++) {
- OdpsType odpsType = columnTypeInfos.get(col).getOdpsType();
- fillArrowVector(root, col, odpsType, data[col], numRows);
- }
+ VectorSchemaRoot root = batchWriter.newElement();
+ try {
+ root.setRowCount(batchRows);
- batchWriter.write(root);
- writtenRows += numRows;
+ for (int col = 0; col < numCols && col <
columnTypeInfos.size(); col++) {
+ OdpsType odpsType =
columnTypeInfos.get(col).getOdpsType();
+ fillArrowVectorStreaming(root, col, odpsType,
+ inputTable.getColumn(col), rowOffset,
batchRows);
+ }
+
+ batchWriter.write(root);
+ } finally {
+ root.close();
+ }
+
+ writtenRows += batchRows;
+ segmentRows += batchRows;
+ rowOffset += batchRows;
+
+ // Segmented commit: rotate batchWriter to release SDK native
memory
+ if (segmentRows >= ROWS_PER_SEGMENT) {
+ rotateBatchWriter();
+ }
+ }
} catch (Exception e) {
String errorMsg = "Failed to write data to MaxCompute table " +
project + "." + tableName;
LOG.error(errorMsg, e);
@@ -230,17 +284,315 @@ public class MaxComputeJniWriter extends JniWriter {
}
}
+ /**
+ * Commit current batchWriter and create a new one with a fresh blockId.
+ * This forces the MaxCompute SDK to flush and release internal native
memory
+ * buffers that accumulate during writes. Without rotation, the SDK holds
all
+ * serialized Arrow data in native memory until close(), causing process
RSS
+ * to grow linearly with total data volume.
+ */
+ private void rotateBatchWriter() throws IOException {
+ try {
+ // 1. Commit current batchWriter and save its commit message
+ WriterCommitMessage msg = batchWriter.commit();
+ commitMessages.add(msg);
+ batchWriter = null;
+
+ // 2. Close current allocator to release Arrow native memory
+ allocator.close();
+ allocator = null;
+
+ // 3. Create new allocator and batchWriter with a new blockId
+ long newBlockId = nextBlockId++;
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ batchWriter = writeSession.createArrowWriter(newBlockId,
+ WriterAttemptId.of(0), writerOptions);
+
+ LOG.info("Rotated batchWriter: oldBlockId=" + blockId + ",
newBlockId=" + newBlockId
+ + ", totalCommitMessages=" + commitMessages.size()
+ + ", totalWrittenRows=" + writtenRows);
+
+ blockId = newBlockId;
+ segmentRows = 0;
+ } catch (Exception e) {
+ throw new IOException("Failed to rotate batchWriter for table "
+ + project + "." + tableName, e);
+ }
+ }
+
+
+ private boolean isVariableWidthType(OdpsType type) {
+ return type == OdpsType.STRING || type == OdpsType.VARCHAR
+ || type == OdpsType.CHAR || type == OdpsType.BINARY;
+ }
+
+ /**
+ * Limit write batch size by estimating variable-width column bytes
directly
+ * from the off-heap VectorColumn, without materializing data to Java heap.
+ */
+ private int limitWriteBatchByBytesStreaming(VectorTable inputTable, int
numCols,
+ int rowOffset, int batchRows) {
+ for (int col = 0; col < numCols && col < columnTypeInfos.size();
col++) {
+ OdpsType odpsType = columnTypeInfos.get(col).getOdpsType();
+ if (!isVariableWidthType(odpsType)) {
+ continue;
+ }
+ VectorColumn vc = inputTable.getColumn(col);
+ batchRows = findMaxRowsForColumnStreaming(vc, rowOffset,
batchRows, MAX_ARROW_BATCH_BYTES);
+ if (batchRows <= 1) {
+ return Math.max(1, batchRows);
+ }
+ }
+ return batchRows;
+ }
+
+ /**
+ * Find the maximum number of rows (from rowOffset) whose total byte size
+ * fits within budget, by reading offset metadata directly from
VectorColumn.
+ */
+ private int findMaxRowsForColumnStreaming(VectorColumn vc, int rowOffset,
int maxRows, long budget) {
+ long totalBytes = estimateColumnBytesStreaming(vc, rowOffset, maxRows);
+ if (totalBytes <= budget) {
+ return maxRows;
+ }
+ int rows = maxRows;
+ while (rows > 1) {
+ rows = rows / 2;
+ totalBytes = estimateColumnBytesStreaming(vc, rowOffset, rows);
+ if (totalBytes <= budget) {
+ int lo = rows;
+ int hi = Math.min(rows * 2, maxRows);
+ while (lo < hi) {
+ int mid = lo + (hi - lo + 1) / 2;
+ if (estimateColumnBytesStreaming(vc, rowOffset, mid) <=
budget) {
+ lo = mid;
+ } else {
+ hi = mid - 1;
+ }
+ }
+ return lo;
+ }
+ }
+ return 1;
+ }
+
+ /**
+ * Estimate total bytes for a range of rows in a VectorColumn by reading
+ * the offset array directly from off-heap memory, without creating any
+ * byte[] objects. This is O(1) per row (just offset subtraction).
+ */
+ private long estimateColumnBytesStreaming(VectorColumn vc, int rowOffset,
int rows) {
+ long total = 0;
+ long offsetAddr = vc.offsetAddress();
+ for (int i = rowOffset; i < rowOffset + rows; i++) {
+ if (!vc.isNullAt(i)) {
+ // String offsets are stored as int32 in VectorColumn
+ int startOff = i == 0 ? 0
+ :
org.apache.doris.common.jni.utils.OffHeap.getInt(null, offsetAddr + 4L * (i -
1));
+ int endOff =
org.apache.doris.common.jni.utils.OffHeap.getInt(null, offsetAddr + 4L * i);
+ total += (endOff - startOff);
+ }
+ }
+ return total;
+ }
+
+ /**
+ * Fill an Arrow vector by reading data directly from a VectorColumn,
+ * one row at a time. For STRING columns, this reads bytes directly
+ * (getBytesWithOffset) instead of creating String objects, eliminating
+ * the String -> byte[] double-copy that caused heap exhaustion.
+ */
+ private void fillArrowVectorStreaming(VectorSchemaRoot root, int colIdx,
OdpsType odpsType,
+ VectorColumn vc, int rowOffset, int
numRows) {
+ switch (odpsType) {
+ case BOOLEAN: {
+ BitVector vec = (BitVector) root.getVector(colIdx);
+ vec.allocateNew(numRows);
+ for (int i = 0; i < numRows; i++) {
+ if (vc.isNullAt(rowOffset + i)) {
+ vec.setNull(i);
+ } else {
+ vec.set(i, vc.getBoolean(rowOffset + i) ? 1 : 0);
+ }
+ }
+ vec.setValueCount(numRows);
+ break;
+ }
+ case TINYINT: {
+ TinyIntVector vec = (TinyIntVector) root.getVector(colIdx);
+ vec.allocateNew(numRows);
+ for (int i = 0; i < numRows; i++) {
+ if (vc.isNullAt(rowOffset + i)) {
+ vec.setNull(i);
+ } else {
+ vec.set(i, vc.getByte(rowOffset + i));
+ }
+ }
+ vec.setValueCount(numRows);
+ break;
+ }
+ case SMALLINT: {
+ SmallIntVector vec = (SmallIntVector) root.getVector(colIdx);
+ vec.allocateNew(numRows);
+ for (int i = 0; i < numRows; i++) {
+ if (vc.isNullAt(rowOffset + i)) {
+ vec.setNull(i);
+ } else {
+ vec.set(i, vc.getShort(rowOffset + i));
+ }
+ }
+ vec.setValueCount(numRows);
+ break;
+ }
+ case INT: {
+ IntVector vec = (IntVector) root.getVector(colIdx);
+ vec.allocateNew(numRows);
+ for (int i = 0; i < numRows; i++) {
+ if (vc.isNullAt(rowOffset + i)) {
+ vec.setNull(i);
+ } else {
+ vec.set(i, vc.getInt(rowOffset + i));
+ }
+ }
+ vec.setValueCount(numRows);
+ break;
+ }
+ case BIGINT: {
+ BigIntVector vec = (BigIntVector) root.getVector(colIdx);
+ vec.allocateNew(numRows);
+ for (int i = 0; i < numRows; i++) {
+ if (vc.isNullAt(rowOffset + i)) {
+ vec.setNull(i);
+ } else {
+ vec.set(i, vc.getLong(rowOffset + i));
+ }
+ }
+ vec.setValueCount(numRows);
+ break;
+ }
+ case FLOAT: {
+ Float4Vector vec = (Float4Vector) root.getVector(colIdx);
+ vec.allocateNew(numRows);
+ for (int i = 0; i < numRows; i++) {
+ if (vc.isNullAt(rowOffset + i)) {
+ vec.setNull(i);
+ } else {
+ vec.set(i, vc.getFloat(rowOffset + i));
+ }
+ }
+ vec.setValueCount(numRows);
+ break;
+ }
+ case DOUBLE: {
+ Float8Vector vec = (Float8Vector) root.getVector(colIdx);
+ vec.allocateNew(numRows);
+ for (int i = 0; i < numRows; i++) {
+ if (vc.isNullAt(rowOffset + i)) {
+ vec.setNull(i);
+ } else {
+ vec.set(i, vc.getDouble(rowOffset + i));
+ }
+ }
+ vec.setValueCount(numRows);
+ break;
+ }
+ case DECIMAL: {
+ DecimalVector vec = (DecimalVector) root.getVector(colIdx);
+ vec.allocateNew(numRows);
+ for (int i = 0; i < numRows; i++) {
+ if (vc.isNullAt(rowOffset + i)) {
+ vec.setNull(i);
+ } else {
+ vec.set(i, vc.getDecimal(rowOffset + i));
+ }
+ }
+ vec.setValueCount(numRows);
+ break;
+ }
+ case STRING:
+ case VARCHAR:
+ case CHAR: {
+ // KEY FIX: Read bytes directly from off-heap, no String
creation.
+ // Previously: getMaterializedData -> String[] ->
toString().getBytes() -> Arrow
+ // Now: getBytesWithOffset() -> byte[] -> Arrow (1 copy
instead of 3)
+ VarCharVector vec = (VarCharVector) root.getVector(colIdx);
+ vec.allocateNew(numRows);
+ for (int i = 0; i < numRows; i++) {
+ if (vc.isNullAt(rowOffset + i)) {
+ vec.setNull(i);
+ } else {
+ byte[] bytes = vc.getBytesWithOffset(rowOffset + i);
+ vec.setSafe(i, bytes);
+ }
+ }
+ vec.setValueCount(numRows);
+ break;
+ }
+ case DATE: {
+ DateDayVector vec = (DateDayVector) root.getVector(colIdx);
+ vec.allocateNew(numRows);
+ for (int i = 0; i < numRows; i++) {
+ if (vc.isNullAt(rowOffset + i)) {
+ vec.setNull(i);
+ } else {
+ LocalDate date = vc.getDate(rowOffset + i);
+ vec.set(i, (int) date.toEpochDay());
+ }
+ }
+ vec.setValueCount(numRows);
+ break;
+ }
+ case DATETIME:
+ case TIMESTAMP: {
+ TimeStampMilliVector vec = (TimeStampMilliVector)
root.getVector(colIdx);
+ vec.allocateNew(numRows);
+ for (int i = 0; i < numRows; i++) {
+ if (vc.isNullAt(rowOffset + i)) {
+ vec.setNull(i);
+ } else {
+ LocalDateTime dt = vc.getDateTime(rowOffset + i);
+ long millis =
dt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+ vec.set(i, millis);
+ }
+ }
+ vec.setValueCount(numRows);
+ break;
+ }
+ case BINARY: {
+ VarBinaryVector vec = (VarBinaryVector) root.getVector(colIdx);
+ vec.allocateNew(numRows);
+ for (int i = 0; i < numRows; i++) {
+ if (vc.isNullAt(rowOffset + i)) {
+ vec.setNull(i);
+ } else {
+ byte[] bytes = vc.getBytesWithOffset(rowOffset + i);
+ vec.setSafe(i, bytes);
+ }
+ }
+ vec.setValueCount(numRows);
+ break;
+ }
+ default: {
+ // For complex types (ARRAY, MAP, STRUCT) and other types,
+ // fall back to object-based materialization for this column
only.
+ Object[] colData = vc.getObjectColumn(rowOffset, rowOffset +
numRows);
+ fillArrowVector(root, colIdx, odpsType, colData, 0, numRows);
+ break;
+ }
+ }
+ }
+
private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType
odpsType,
- Object[] colData, int numRows) {
+ Object[] colData, int rowOffset, int
numRows) {
switch (odpsType) {
case BOOLEAN: {
BitVector vec = (BitVector) root.getVector(colIdx);
vec.allocateNew(numRows);
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
vec.setNull(i);
} else {
- vec.set(i, (Boolean) colData[i] ? 1 : 0);
+ vec.set(i, (Boolean) colData[rowOffset + i] ? 1 : 0);
}
}
vec.setValueCount(numRows);
@@ -250,10 +602,10 @@ public class MaxComputeJniWriter extends JniWriter {
TinyIntVector vec = (TinyIntVector) root.getVector(colIdx);
vec.allocateNew(numRows);
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
vec.setNull(i);
} else {
- vec.set(i, ((Number) colData[i]).byteValue());
+ vec.set(i, ((Number) colData[rowOffset +
i]).byteValue());
}
}
vec.setValueCount(numRows);
@@ -263,10 +615,10 @@ public class MaxComputeJniWriter extends JniWriter {
SmallIntVector vec = (SmallIntVector) root.getVector(colIdx);
vec.allocateNew(numRows);
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
vec.setNull(i);
} else {
- vec.set(i, ((Number) colData[i]).shortValue());
+ vec.set(i, ((Number) colData[rowOffset +
i]).shortValue());
}
}
vec.setValueCount(numRows);
@@ -276,10 +628,10 @@ public class MaxComputeJniWriter extends JniWriter {
IntVector vec = (IntVector) root.getVector(colIdx);
vec.allocateNew(numRows);
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
vec.setNull(i);
} else {
- vec.set(i, ((Number) colData[i]).intValue());
+ vec.set(i, ((Number) colData[rowOffset +
i]).intValue());
}
}
vec.setValueCount(numRows);
@@ -289,10 +641,10 @@ public class MaxComputeJniWriter extends JniWriter {
BigIntVector vec = (BigIntVector) root.getVector(colIdx);
vec.allocateNew(numRows);
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
vec.setNull(i);
} else {
- vec.set(i, ((Number) colData[i]).longValue());
+ vec.set(i, ((Number) colData[rowOffset +
i]).longValue());
}
}
vec.setValueCount(numRows);
@@ -302,10 +654,10 @@ public class MaxComputeJniWriter extends JniWriter {
Float4Vector vec = (Float4Vector) root.getVector(colIdx);
vec.allocateNew(numRows);
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
vec.setNull(i);
} else {
- vec.set(i, ((Number) colData[i]).floatValue());
+ vec.set(i, ((Number) colData[rowOffset +
i]).floatValue());
}
}
vec.setValueCount(numRows);
@@ -315,10 +667,10 @@ public class MaxComputeJniWriter extends JniWriter {
Float8Vector vec = (Float8Vector) root.getVector(colIdx);
vec.allocateNew(numRows);
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
vec.setNull(i);
} else {
- vec.set(i, ((Number) colData[i]).doubleValue());
+ vec.set(i, ((Number) colData[rowOffset +
i]).doubleValue());
}
}
vec.setValueCount(numRows);
@@ -328,12 +680,12 @@ public class MaxComputeJniWriter extends JniWriter {
DecimalVector vec = (DecimalVector) root.getVector(colIdx);
vec.allocateNew(numRows);
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
vec.setNull(i);
} else {
- BigDecimal bd = (colData[i] instanceof BigDecimal)
- ? (BigDecimal) colData[i]
- : new BigDecimal(colData[i].toString());
+ BigDecimal bd = (colData[rowOffset + i] instanceof
BigDecimal)
+ ? (BigDecimal) colData[rowOffset + i]
+ : new BigDecimal(colData[rowOffset +
i].toString());
vec.set(i, bd);
}
}
@@ -346,14 +698,14 @@ public class MaxComputeJniWriter extends JniWriter {
VarCharVector vec = (VarCharVector) root.getVector(colIdx);
vec.allocateNew(numRows);
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
vec.setNull(i);
} else {
byte[] bytes;
- if (colData[i] instanceof byte[]) {
- bytes = (byte[]) colData[i];
+ if (colData[rowOffset + i] instanceof byte[]) {
+ bytes = (byte[]) colData[rowOffset + i];
} else {
- bytes =
colData[i].toString().getBytes(StandardCharsets.UTF_8);
+ bytes = colData[rowOffset +
i].toString().getBytes(StandardCharsets.UTF_8);
}
vec.setSafe(i, bytes);
}
@@ -365,12 +717,12 @@ public class MaxComputeJniWriter extends JniWriter {
DateDayVector vec = (DateDayVector) root.getVector(colIdx);
vec.allocateNew(numRows);
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
vec.setNull(i);
- } else if (colData[i] instanceof LocalDate) {
- vec.set(i, (int) ((LocalDate)
colData[i]).toEpochDay());
+ } else if (colData[rowOffset + i] instanceof LocalDate) {
+ vec.set(i, (int) ((LocalDate) colData[rowOffset +
i]).toEpochDay());
} else {
- vec.set(i, (int)
LocalDate.parse(colData[i].toString()).toEpochDay());
+ vec.set(i, (int) LocalDate.parse(colData[rowOffset +
i].toString()).toEpochDay());
}
}
vec.setValueCount(numRows);
@@ -381,16 +733,16 @@ public class MaxComputeJniWriter extends JniWriter {
TimeStampMilliVector vec = (TimeStampMilliVector)
root.getVector(colIdx);
vec.allocateNew(numRows);
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
vec.setNull(i);
- } else if (colData[i] instanceof LocalDateTime) {
- long millis = ((LocalDateTime) colData[i])
+ } else if (colData[rowOffset + i] instanceof
LocalDateTime) {
+ long millis = ((LocalDateTime) colData[rowOffset + i])
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
vec.set(i, millis);
- } else if (colData[i] instanceof java.sql.Timestamp) {
- vec.set(i, ((java.sql.Timestamp)
colData[i]).getTime());
+ } else if (colData[rowOffset + i] instanceof
java.sql.Timestamp) {
+ vec.set(i, ((java.sql.Timestamp) colData[rowOffset +
i]).getTime());
} else {
- long millis =
LocalDateTime.parse(colData[i].toString())
+ long millis = LocalDateTime.parse(colData[rowOffset +
i].toString())
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
vec.set(i, millis);
}
@@ -402,12 +754,12 @@ public class MaxComputeJniWriter extends JniWriter {
VarBinaryVector vec = (VarBinaryVector) root.getVector(colIdx);
vec.allocateNew(numRows);
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
vec.setNull(i);
- } else if (colData[i] instanceof byte[]) {
- vec.setSafe(i, (byte[]) colData[i]);
+ } else if (colData[rowOffset + i] instanceof byte[]) {
+ vec.setSafe(i, (byte[]) colData[rowOffset + i]);
} else {
- vec.setSafe(i,
colData[i].toString().getBytes(StandardCharsets.UTF_8));
+ vec.setSafe(i, colData[rowOffset +
i].toString().getBytes(StandardCharsets.UTF_8));
}
}
vec.setValueCount(numRows);
@@ -419,10 +771,10 @@ public class MaxComputeJniWriter extends JniWriter {
FieldVector dataVec = listVec.getDataVector();
int elemIdx = 0;
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
listVec.setNull(i);
} else {
- List<?> list = (List<?>) colData[i];
+ List<?> list = (List<?>) colData[rowOffset + i];
listVec.startNewValue(i);
for (Object elem : list) {
writeListElement(dataVec, elemIdx++, elem);
@@ -442,10 +794,10 @@ public class MaxComputeJniWriter extends JniWriter {
FieldVector valVec = structVec.getChildrenFromFields().get(1);
int elemIdx = 0;
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
mapVec.setNull(i);
} else {
- Map<?, ?> map = (Map<?, ?>) colData[i];
+ Map<?, ?> map = (Map<?, ?>) colData[rowOffset + i];
mapVec.startNewValue(i);
for (Map.Entry<?, ?> entry : map.entrySet()) {
structVec.setIndexDefined(elemIdx);
@@ -466,11 +818,11 @@ public class MaxComputeJniWriter extends JniWriter {
StructVector structVec = (StructVector) root.getVector(colIdx);
structVec.allocateNew();
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
structVec.setNull(i);
} else {
structVec.setIndexDefined(i);
- Map<?, ?> struct = (Map<?, ?>) colData[i];
+ Map<?, ?> struct = (Map<?, ?>) colData[rowOffset + i];
for (FieldVector childVec :
structVec.getChildrenFromFields()) {
Object val = struct.get(childVec.getName());
writeListElement(childVec, i, val);
@@ -488,10 +840,10 @@ public class MaxComputeJniWriter extends JniWriter {
VarCharVector vec = (VarCharVector) root.getVector(colIdx);
vec.allocateNew(numRows);
for (int i = 0; i < numRows; i++) {
- if (colData[i] == null) {
+ if (colData[rowOffset + i] == null) {
vec.setNull(i);
} else {
- vec.setSafe(i,
colData[i].toString().getBytes(StandardCharsets.UTF_8));
+ vec.setSafe(i, colData[rowOffset +
i].toString().getBytes(StandardCharsets.UTF_8));
}
}
vec.setValueCount(numRows);
@@ -580,23 +932,41 @@ public class MaxComputeJniWriter extends JniWriter {
@Override
public void close() throws IOException {
+ Exception firstException = null;
try {
+ // Commit the final segment's batchWriter
if (batchWriter != null) {
- commitMessage = batchWriter.commit();
- batchWriter = null;
+ try {
+ WriterCommitMessage msg = batchWriter.commit();
+ commitMessages.add(msg);
+ } catch (Exception e) {
+ firstException = e;
+ LOG.warn("Failed to commit batch writer for table " +
project + "." + tableName, e);
+ } finally {
+ batchWriter = null;
+ }
}
+ } finally {
if (allocator != null) {
- allocator.close();
- allocator = null;
+ try {
+ allocator.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close Arrow allocator (possible memory
leak)", e);
+ if (firstException == null) {
+ firstException = e;
+ }
+ } finally {
+ allocator = null;
+ }
}
- LOG.info("MaxComputeJniWriter closed: writeSessionId=" +
writeSessionId
- + ", partitionSpec=" + partitionSpec
- + ", writtenRows=" + writtenRows
- + ", blockId=" + blockId);
- } catch (Exception e) {
- String errorMsg = "Failed to close MaxCompute arrow writer";
- LOG.error(errorMsg, e);
- throw new IOException(errorMsg, e);
+ }
+ LOG.info("MaxComputeJniWriter closed: writeSessionId=" + writeSessionId
+ + ", partitionSpec=" + partitionSpec
+ + ", writtenRows=" + writtenRows
+ + ", totalSegments=" + commitMessages.size()
+ + ", blockId=" + blockId);
+ if (firstException != null) {
+ throw new IOException("Failed to close MaxCompute arrow writer",
firstException);
}
}
@@ -605,16 +975,18 @@ public class MaxComputeJniWriter extends JniWriter {
Map<String, String> stats = new HashMap<>();
stats.put("mc_partition_spec", partitionSpec != null ? partitionSpec :
"");
- // Serialize WriterCommitMessage to Base64
- if (commitMessage != null) {
+ // Serialize all WriterCommitMessages (one per segment) as a List
object.
+ if (!commitMessages.isEmpty()) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(commitMessage);
+ // Serialize the entire list as one object to avoid mixing
+ // writeInt/writeObject which causes OptionalDataException
+ oos.writeObject(new java.util.ArrayList<>(commitMessages));
oos.close();
stats.put("mc_commit_message",
Base64.getEncoder().encodeToString(baos.toByteArray()));
} catch (IOException e) {
- LOG.error("Failed to serialize WriterCommitMessage", e);
+ LOG.error("Failed to serialize WriterCommitMessages", e);
}
}
@@ -625,3 +997,4 @@ public class MaxComputeJniWriter extends JniWriter {
return stats;
}
}
+
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
b/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
index 627f3bc03e2..9ca45b19ea8 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java
@@ -57,6 +57,12 @@ public class MCProperties {
public static final String DEFAULT_READ_TIMEOUT = "120"; // 120s
public static final String DEFAULT_RETRY_COUNT = "4"; // 4 times
+ public static final String MAX_FIELD_SIZE = "mc.max_field_size_bytes";
+ public static final String DEFAULT_MAX_FIELD_SIZE = "8388608"; // 8 * 1024
* 1024 = 8MB
+
+ public static final String MAX_WRITE_BATCH_ROWS =
"mc.max_write_batch_rows";
+ public static final String DEFAULT_MAX_WRITE_BATCH_ROWS = "4096";
+
//withCrossPartition(true):
// Very friendly to scenarios where there are many partitions but
each partition is very small.
//withCrossPartition(false):
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
index 871312c476c..77200d47cec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
@@ -95,6 +95,7 @@ public class MCTransaction implements Transaction {
TableWriteSessionBuilder builder = new TableWriteSessionBuilder()
.identifier(tableId)
.withSettings(catalog.getSettings())
+ .withMaxFieldSize(catalog.getMaxFieldSize())
.withArrowOptions(ArrowOptions.newBuilder()
.withDatetimeUnit(TimestampUnit.MILLI)
.withTimestampUnit(TimestampUnit.MILLI)
@@ -136,9 +137,13 @@ public class MCTransaction implements Transaction {
byte[] bytes =
Base64.getDecoder().decode(data.getCommitMessage());
ByteArrayInputStream bais = new
ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bais);
- WriterCommitMessage msg = (WriterCommitMessage)
ois.readObject();
+ // Deserialized as List<WriterCommitMessage> —
supports segmented
+ // commit where one writer produces multiple commit
messages
+ @SuppressWarnings("unchecked")
+ List<WriterCommitMessage> msgs =
+ (List<WriterCommitMessage>) ois.readObject();
+ allMessages.addAll(msgs);
ois.close();
- allMessages.add(msg);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
index 73eb62f5dc2..b3a15355b14 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
@@ -69,6 +69,8 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
private int connectTimeout;
private int readTimeout;
private int retryTimes;
+ private long maxFieldSize;
+ private int maxWriteBatchRows;
public boolean dateTimePredicatePushDown;
@@ -191,6 +193,10 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
props.getOrDefault(MCProperties.READ_TIMEOUT,
MCProperties.DEFAULT_READ_TIMEOUT));
retryTimes = Integer.parseInt(
props.getOrDefault(MCProperties.RETRY_COUNT,
MCProperties.DEFAULT_RETRY_COUNT));
+ maxFieldSize = Long.parseLong(
+ props.getOrDefault(MCProperties.MAX_FIELD_SIZE,
MCProperties.DEFAULT_MAX_FIELD_SIZE));
+ maxWriteBatchRows = Integer.parseInt(
+ props.getOrDefault(MCProperties.MAX_WRITE_BATCH_ROWS,
MCProperties.DEFAULT_MAX_WRITE_BATCH_ROWS));
RestOptions restOptions = RestOptions.newBuilder()
.withConnectTimeout(connectTimeout)
@@ -320,6 +326,16 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
return readTimeout;
}
+ public long getMaxFieldSize() {
+ makeSureInitialized();
+ return maxFieldSize;
+ }
+
+ public int getMaxWriteBatchRows() {
+ makeSureInitialized();
+ return maxWriteBatchRows;
+ }
+
public boolean getDateTimePredicatePushDown() {
return dateTimePredicatePushDown;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalMaxComputeTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalMaxComputeTableSink.java
index db9d7aec00b..c02a2553e79 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalMaxComputeTableSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalMaxComputeTableSink.java
@@ -114,6 +114,19 @@ public class PhysicalMaxComputeTableSink<CHILD_TYPE
extends Plan> extends Physic
.map(Column::getName)
.collect(Collectors.toSet());
if (!partitionNames.isEmpty()) {
+ // Check if any partition column is present in cols (the bound
columns from SELECT).
+ // Static partition columns are excluded from cols by
BindSink.bindMaxComputeTableSink(),
+ // so if no partition column remains in cols, all partitions are
statically specified
+ // and we don't need sort/shuffle — all data goes to a single
known partition.
+ Set<String> colNames = cols.stream()
+ .map(Column::getName)
+ .collect(Collectors.toSet());
+ boolean hasDynamicPartition =
partitionNames.stream().anyMatch(colNames::contains);
+ if (!hasDynamicPartition) {
+ // All partition columns are statically specified, no sort
needed
+ return PhysicalProperties.SINK_RANDOM_PARTITIONED;
+ }
+
List<Integer> columnIdx = new ArrayList<>();
List<Column> fullSchema = targetTable.getFullSchema();
for (int i = 0; i < fullSchema.size(); i++) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java
index fdb50245a8e..bf52e894628 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java
@@ -74,6 +74,7 @@ public class MaxComputeTableSink extends
BaseExternalTableDataSink {
tSink.setConnectTimeout(catalog.getConnectTimeout());
tSink.setReadTimeout(catalog.getReadTimeout());
tSink.setRetryCount(catalog.getRetryTimes());
+ tSink.setMaxWriteBatchRows(catalog.getMaxWriteBatchRows());
// Partition columns
List<String> partitionColumnNames =
targetTable.getPartitionColumns().stream()
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 8add59d47af..02b064a4aa1 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -514,6 +514,7 @@ struct TMaxComputeTableSink {
14: optional list<string> partition_columns // partition column names for
dynamic partition
15: optional string write_session_id // Storage API write session
ID
16: optional map<string, string> properties // contains authentication
properties
+ 17: optional i32 max_write_batch_rows // max rows per Arrow batch
for write
}
struct TDataSink {
diff --git
a/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_large_data.out
b/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_large_data.out
index 50b2cb76282..4b1de864f61 100644
---
a/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_large_data.out
+++
b/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_large_data.out
@@ -44,3 +44,13 @@
8 name_8 0.08 20250101 r3
9 name_9 0.09 20250102 r1
+-- !props_count --
+2000
+
+-- !props_top5 --
+0 name_0
+1 name_1
+2 name_2
+3 name_3
+4 name_4
+
diff --git
a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy
b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy
index 6897a01125c..d45389e9ee0 100644
---
a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy
+++
b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy
@@ -140,4 +140,53 @@ suite("test_mc_write_large_data", "p2,external") {
sql """DROP TABLE IF EXISTS internal.${internal_db}.${internal_tb}"""
sql """DROP DATABASE IF EXISTS internal.${internal_db}"""
}
+
+ // Test: mc.max_write_batch_rows and mc.max_field_size_bytes catalog
properties
+ String mc_catalog_props = "test_mc_write_large_data_props"
+ sql """drop catalog if exists ${mc_catalog_props}"""
+ sql """
+ CREATE CATALOG IF NOT EXISTS ${mc_catalog_props} PROPERTIES (
+ "type" = "max_compute",
+ "mc.default.project" = "${defaultProject}",
+ "mc.access_key" = "${ak}",
+ "mc.secret_key" = "${sk}",
+ "mc.endpoint" =
"http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api",
+ "mc.quota" = "pay-as-you-go",
+ "mc.enable.namespace.schema" = "true",
+ "mc.max_write_batch_rows" = "512",
+ "mc.max_field_size_bytes" = "4194304"
+ );
+ """
+
+ sql """switch ${mc_catalog_props}"""
+ String db_props = "mc_props_test_${uuid}"
+ sql """drop database if exists ${db_props}"""
+ sql """create database ${db_props}"""
+ sql """use ${db_props}"""
+
+ try {
+ String tb_props = "props_verify_${uuid}"
+ sql """DROP TABLE IF EXISTS ${tb_props}"""
+ sql """
+ CREATE TABLE ${tb_props} (
+ id INT,
+ name STRING
+ )
+ """
+
+ // Insert 2000 rows to exceed max_write_batch_rows=512 (will be split
into 4 batches)
+ sql """
+ INSERT INTO ${tb_props}
+ SELECT
+ number AS id,
+ concat('name_', cast(number AS STRING)) AS name
+ FROM numbers("number"="2000")
+ """
+
+ qt_props_count """ SELECT count(*) FROM ${tb_props} """
+ order_qt_props_top5 """ SELECT * FROM ${tb_props} ORDER BY id LIMIT 5
"""
+ } finally {
+ sql """drop database if exists ${mc_catalog_props}.${db_props}"""
+ sql """drop catalog if exists ${mc_catalog_props}"""
+ }
}
diff --git
a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy
b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy
index c4a06d8634e..0e4aafe7389 100644
---
a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy
+++
b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy
@@ -66,6 +66,17 @@ suite("test_mc_write_static_partitions", "p2,external") {
order_qt_static_single_p1 """ SELECT * FROM ${tb1} WHERE ds =
'20250101' """
order_qt_static_single_p2 """ SELECT * FROM ${tb1} WHERE ds =
'20250102' """
+ // Explain: static partition INSERT should NOT have SORT node
+ explain {
+ sql("INSERT INTO ${tb1} PARTITION(ds='20250103') VALUES (4, 'd')")
+ notContains "SORT"
+ }
+ // Explain: static partition INSERT OVERWRITE should NOT have SORT node
+ explain {
+ sql("INSERT OVERWRITE TABLE ${tb1} PARTITION(ds='20250101') VALUES
(5, 'e')")
+ notContains "SORT"
+ }
+
// Test 2: Multi-level partition columns static partition INSERT INTO
String tb2 = "static_multi_${uuid}"
sql """DROP TABLE IF EXISTS ${tb2}"""
@@ -82,6 +93,12 @@ suite("test_mc_write_static_partitions", "p2,external") {
order_qt_static_multi_all """ SELECT * FROM ${tb2} """
order_qt_static_multi_bj """ SELECT * FROM ${tb2} WHERE region = 'bj'
"""
+ // Explain: all partition columns statically specified should NOT have
SORT node
+ explain {
+ sql("INSERT INTO ${tb2} PARTITION(ds='20250102', region='gz')
VALUES (4, 'v4')")
+ notContains "SORT"
+ }
+
test {
sql """ INSERT INTO ${tb2} PARTITION(ds='20250101', region='bj',
ds='20250102') VALUES (1, 'v1'), (2, 'v2');"""
exception "Duplicate partition column: ds"
@@ -111,6 +128,12 @@ suite("test_mc_write_static_partitions", "p2,external") {
sql """INSERT INTO ${tb3_dst} PARTITION(ds='20250201') SELECT id, name
FROM ${tb3_src}"""
order_qt_static_select """ SELECT * FROM ${tb3_dst} """
+ // Explain: static partition INSERT INTO SELECT should NOT have SORT
node
+ explain {
+ sql("INSERT INTO ${tb3_dst} PARTITION(ds='20250202') SELECT id,
name FROM ${tb3_src}")
+ notContains "SORT"
+ }
+
// Test 4: INSERT OVERWRITE static partition
String tb4 = "overwrite_part_${uuid}"
sql """DROP TABLE IF EXISTS ${tb4}"""
@@ -129,6 +152,7 @@ suite("test_mc_write_static_partitions", "p2,external") {
order_qt_overwrite_p2 """ SELECT * FROM ${tb4} WHERE ds = '20250102'
"""
// Test 5: Dynamic partition regression (ensure not broken)
+ // Dynamic partition: partition column 'ds' is in the data, SORT node
IS expected
String tb5 = "dynamic_reg_${uuid}"
sql """DROP TABLE IF EXISTS ${tb5}"""
sql """
@@ -141,6 +165,17 @@ suite("test_mc_write_static_partitions", "p2,external") {
sql """INSERT INTO ${tb5} VALUES (1, 'a', '20250101'), (2, 'b',
'20250102')"""
order_qt_dynamic_regression """ SELECT * FROM ${tb5} """
+ // Explain: dynamic partition INSERT should HAVE SORT node (partition
col in data)
+ explain {
+ sql("INSERT INTO ${tb5} VALUES (3, 'c', '20250103')")
+ contains "SORT"
+ }
+ // Explain: dynamic partition INSERT INTO SELECT should HAVE SORT node
+ explain {
+ sql("INSERT INTO ${tb5} SELECT * FROM ${tb3_src}")
+ contains "SORT"
+ }
+
// Test 6: INSERT OVERWRITE non-partitioned table
String tb6 = "overwrite_nopart_${uuid}"
sql """DROP TABLE IF EXISTS ${tb6}"""
@@ -153,6 +188,41 @@ suite("test_mc_write_static_partitions", "p2,external") {
sql """INSERT INTO ${tb6} VALUES (1, 'old')"""
sql """INSERT OVERWRITE TABLE ${tb6} VALUES (2, 'new')"""
order_qt_overwrite_no_part """ SELECT * FROM ${tb6} """
+
+ // Explain: non-partitioned table INSERT should NOT have SORT node
+ explain {
+ sql("INSERT INTO ${tb6} VALUES (3, 'val')")
+ notContains "SORT"
+ }
+ // Explain: non-partitioned table INSERT OVERWRITE should NOT have
SORT node
+ explain {
+ sql("INSERT OVERWRITE TABLE ${tb6} VALUES (4, 'val2')")
+ notContains "SORT"
+ }
+
+ // Test 7: Multi-level partition with partial static (only some
partition cols specified)
+ // When only some partition columns are statically specified (partial
static),
+ // the remaining partition columns are dynamic, so SORT node IS
expected
+ String tb7 = "partial_static_${uuid}"
+ sql """DROP TABLE IF EXISTS ${tb7}"""
+ sql """
+ CREATE TABLE ${tb7} (
+ id INT,
+ val STRING,
+ ds STRING,
+ region STRING
+ ) PARTITION BY (ds, region)()
+ """
+ // Explain: partial static partition (ds static, region dynamic)
should HAVE SORT node
+ explain {
+ sql("INSERT INTO ${tb7} PARTITION(ds='20250101') VALUES (1, 'v1',
'bj')")
+ contains "SORT"
+ }
+ // Explain: all static partition should NOT have SORT node
+ explain {
+ sql("INSERT INTO ${tb7} PARTITION(ds='20250101', region='bj')
VALUES (1, 'v1')")
+ notContains "SORT"
+ }
} finally {
sql """drop database if exists ${mc_catalog_name}.${db}"""
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]