This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 00448264 [Fix](batch) prevent writer deadlock from currentCacheBytes 
drift (#653)
00448264 is described below

commit 00448264b8119d40c6184d75e2f6be3def16242f
Author: Adesh Nalpet Adimurthy <[email protected]>
AuthorDate: Tue May 5 23:30:38 2026 -0400

    [Fix](batch) prevent writer deadlock from currentCacheBytes drift (#653)
    
    DorisBatchStreamLoad increments currentCacheBytes by the client-side record 
bytes on insert, but decrements it by respContent.getLoadBytes() on a 
successful load. Whenever the BE-reported value is smaller than what the client 
buffered, either by partial_columns=true, compress_type=gz, etc, each load 
leaks a few bytes from the counter.
    
    Over time the leak accumulates above maxBlockedBytes, so writeRecord parks 
on block.await() forever even though bufferMap and flushQueue are empty. The 
job freezes with no exception, only repeating Cache full, waiting for flush and 
bufferMap is empty, no need to flush null logs.
---
 .../flink/sink/batch/DorisBatchStreamLoad.java     | 28 ++++++++++------------
 1 file changed, 13 insertions(+), 15 deletions(-)

diff --git 
a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 4bc0c19d..fc21bb97 100644
--- 
a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -202,6 +202,18 @@ public class DorisBatchStreamLoad implements Serializable {
         currentCacheBytes.addAndGet(bytes);
         getLock(bufferKey).readLock().unlock();
 
+        if (flushQueue.size() < executionOptions.getFlushQueueSize()
+                && (buffer.getBufferSizeBytes() >= 
executionOptions.getBufferFlushMaxBytes()
+                        || buffer.getNumOfRecords() >= 
executionOptions.getBufferFlushMaxRows())) {
+            boolean flush = bufferFullFlush(bufferKey);
+            LOG.info("trigger flush by buffer full, flush: {}", flush);
+        } else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES
+                || buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) {
+            // The buffer capacity exceeds the stream load limit, flush
+            boolean flush = bufferFullFlush(bufferKey);
+            LOG.info("trigger flush by buffer exceeding the limit, flush: {}", 
flush);
+        }
+
         if (currentCacheBytes.get() > maxBlockedBytes) {
             lock.lock();
             try {
@@ -220,20 +232,6 @@ public class DorisBatchStreamLoad implements Serializable {
                 lock.unlock();
             }
         }
-
-        // queue has space, flush according to the bufferMaxRows/bufferMaxBytes
-        if (flushQueue.size() < executionOptions.getFlushQueueSize()
-                && (buffer.getBufferSizeBytes() >= 
executionOptions.getBufferFlushMaxBytes()
-                        || buffer.getNumOfRecords() >= 
executionOptions.getBufferFlushMaxRows())) {
-            boolean flush = bufferFullFlush(bufferKey);
-            LOG.info("trigger flush by buffer full, flush: {}", flush);
-
-        } else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES
-                || buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) {
-            // The buffer capacity exceeds the stream load limit, flush
-            boolean flush = bufferFullFlush(bufferKey);
-            LOG.info("trigger flush by buffer exceeding the limit, flush: {}", 
flush);
-        }
     }
 
     public boolean bufferFullFlush(String bufferKey) {
@@ -499,7 +497,7 @@ public class DorisBatchStreamLoad implements Serializable {
                                     OBJECT_MAPPER.readValue(loadResult, 
RespContent.class);
                             if 
(DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                                 long cacheByteBeforeFlush =
-                                        
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
+                                        
currentCacheBytes.getAndAdd(-buffer.getBufferSizeBytes());
                                 LOG.info(
                                         "load success, cacheBeforeFlushBytes: 
{}, currentCacheBytes : {}",
                                         cacheByteBeforeFlush,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to