This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 118bea95 [Improve] add buffer-map read-write-lock in batch mode (#555)
118bea95 is described below
commit 118bea95710d11ef00975ffaeb32eeb9e1d3d683
Author: nativeCat <[email protected]>
AuthorDate: Tue Feb 18 15:42:40 2025 +0800
[Improve] add buffer-map read-write-lock in batch mode (#555)
dorisBatchStreamLoad.bufferMap get/remove operation add read-write lock.
---
.../flink/sink/batch/DorisBatchStreamLoad.java | 22 +++++++++++++++++++---
1 file changed, 19 insertions(+), 3 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 479fab64..267a121c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -63,7 +63,9 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
@@ -111,6 +113,7 @@ public class DorisBatchStreamLoad implements Serializable {
private final AtomicLong currentCacheBytes = new AtomicLong(0L);
private final Lock lock = new ReentrantLock();
private final Condition block = lock.newCondition();
+ private final Map<String, ReadWriteLock> bufferMapLock = new
ConcurrentHashMap<>();
public DorisBatchStreamLoad(
DorisOptions dorisOptions,
@@ -181,7 +184,7 @@ public class DorisBatchStreamLoad implements Serializable {
public void writeRecord(String database, String table, byte[] record) {
checkFlushException();
String bufferKey = getTableIdentifier(database, table);
-
+ getLock(bufferKey).readLock().lock();
BatchRecordBuffer buffer =
bufferMap.computeIfAbsent(
bufferKey,
@@ -194,6 +197,7 @@ public class DorisBatchStreamLoad implements Serializable {
int bytes = buffer.insert(record);
currentCacheBytes.addAndGet(bytes);
+ getLock(bufferKey).readLock().unlock();
if (currentCacheBytes.get() > maxBlockedBytes) {
lock.lock();
try {
@@ -283,11 +287,19 @@ public class DorisBatchStreamLoad implements Serializable
{
}
private synchronized void flushBuffer(String bufferKey) {
- BatchRecordBuffer buffer = bufferMap.get(bufferKey);
+ BatchRecordBuffer buffer;
+ try {
+ getLock(bufferKey).writeLock().lock();
+ buffer = bufferMap.remove(bufferKey);
+ } finally {
+ getLock(bufferKey).writeLock().unlock();
+ }
+ if (buffer == null) {
+ return;
+ }
buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable()));
LOG.debug("flush buffer for key {} with label {}", bufferKey,
buffer.getLabelName());
putRecordToFlushQueue(buffer);
- bufferMap.remove(bufferKey);
}
private void putRecordToFlushQueue(BatchRecordBuffer buffer) {
@@ -374,6 +386,10 @@ public class DorisBatchStreamLoad implements Serializable {
return true;
}
+ private ReadWriteLock getLock(String bufferKey) {
+ return bufferMapLock.computeIfAbsent(bufferKey, k -> new
ReentrantReadWriteLock());
+ }
+
class LoadAsyncExecutor implements Runnable {
private int flushQueueSize;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]