This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new c94c331 fix buffer memory grow bug (#203)
c94c331 is described below
commit c94c3311e5f0c4e4b80300e808c4a1209e24efba
Author: wudi <[email protected]>
AuthorDate: Sun Oct 8 14:32:01 2023 +0800
fix buffer memory grow bug (#203)
Co-authored-by: wudi <>
---
.../doris/flink/sink/batch/BatchRecordBuffer.java | 20 ++++++++--------
.../flink/sink/batch/TestBatchRecordBuffer.java | 27 ++++++++++++++++++----
2 files changed, 33 insertions(+), 14 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
index 99876bb..5fa601d 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
@@ -17,7 +17,6 @@
package org.apache.doris.flink.sink.batch;
-import org.apache.doris.flink.sink.writer.RecordBuffer;
import org.apache.flink.annotation.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,22 +59,25 @@ public class BatchRecordBuffer {
@VisibleForTesting
public void ensureCapacity(int length) {
- if(buffer.remaining() >= length){
+ int lineDelimiterSize = this.lineDelimiter.length;
+ if(buffer.remaining() - lineDelimiterSize >= length){
return;
}
int currentRemain = buffer.remaining();
int currentCapacity = buffer.capacity();
-
- int target = buffer.remaining() + length;
- int capacity = buffer.capacity();
- //grow 512kb each time
- target = Math.max(target, Math.min(capacity + 512 * 1024, capacity *
2));
- ByteBuffer tmp = ByteBuffer.allocate(target);
+ // add lineDelimiter length
+ int needed = length - buffer.remaining() + lineDelimiterSize;
+ // grow at least 1MB
+ long grow = Math.max(needed, 1024 * 1024);
+ // grow at least 50% of the current size
+ grow = Math.max(buffer.capacity() / 2, grow);
+ int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.capacity()
+ grow);
+ ByteBuffer tmp = ByteBuffer.allocate(newCapacity);
buffer.flip();
tmp.put(buffer);
buffer.clear();
buffer = tmp;
- LOG.info("record length {},buffer remain {} ,grow capacity {} to {}",
length, currentRemain, currentCapacity, target);
+ LOG.info("record length {},buffer remain {} ,grow capacity {} to {}",
length, currentRemain, currentCapacity, newCapacity);
}
public String getLabelName() {
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
index 18cb79a..1139358 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
@@ -55,21 +55,38 @@ public class TestBatchRecordBuffer {
BatchRecordBuffer recordBuffer = new
BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),1);
recordBuffer.ensureCapacity(10);
- Assert.assertEquals(recordBuffer.getBuffer().capacity(), 10 + 1);
+ //grow at least 1MB
+ Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 * 1024 +
1);
recordBuffer.ensureCapacity(100);
- Assert.assertEquals(recordBuffer.getBuffer().capacity(), 100 + 11);
+ Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 * 1024 +
1);
recordBuffer.ensureCapacity(1024);
- Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 + 111);
+ Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 * 1024 +
1);
+ //not need grow
recordBuffer = new
BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),16);
- recordBuffer.ensureCapacity(16);
+ recordBuffer.ensureCapacity(15);
Assert.assertEquals(recordBuffer.getBuffer().capacity(), 16);
recordBuffer.insert("1234567890".getBytes(StandardCharsets.UTF_8));
recordBuffer.ensureCapacity(8);
- Assert.assertEquals(recordBuffer.getBuffer().capacity(), 32);
+ Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 * 1024 +
16);
+
+ recordBuffer = new
BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),10);
+ recordBuffer.insert("1234567".getBytes(StandardCharsets.UTF_8));
+
recordBuffer.insert("123456789012345678901234567890".getBytes(StandardCharsets.UTF_8));
+
+ //
+ recordBuffer = new
BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),10);
+ recordBuffer.ensureCapacity(2 * 1024 * 1024);
+ Assert.assertEquals(recordBuffer.getBuffer().capacity(), 2 * 1024 *
1024 - 10 + 1 + 10);
+
+ //grow at least 50% of the current size
+ recordBuffer = new
BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),5 * 1024 * 1024);
+ recordBuffer.insert(ByteBuffer.allocate(2 * 1024 * 1024).array());
+ recordBuffer.insert(ByteBuffer.allocate(3 * 1024 * 1024 + 1).array());
+ Assert.assertEquals(recordBuffer.getBuffer().capacity(), 5 * 1024 *
1024 + 5 * 1024 * 1024 / 2);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]