Copilot commented on code in PR #653:
URL:
https://github.com/apache/doris-flink-connector/pull/653#discussion_r3192679750
##########
flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java:
##########
@@ -202,6 +202,18 @@ public void writeRecord(String database, String table,
byte[] record) {
currentCacheBytes.addAndGet(bytes);
getLock(bufferKey).readLock().unlock();
+ if (flushQueue.size() < executionOptions.getFlushQueueSize()
+ && (buffer.getBufferSizeBytes() >=
executionOptions.getBufferFlushMaxBytes()
+ || buffer.getNumOfRecords() >=
executionOptions.getBufferFlushMaxRows())) {
+ boolean flush = bufferFullFlush(bufferKey);
+ LOG.info("trigger flush by buffer full, flush: {}", flush);
+ } else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES
+ || buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) {
+ // The buffer capacity exceeds the stream load limit, flush
+ boolean flush = bufferFullFlush(bufferKey);
+ LOG.info("trigger flush by buffer exceeding the limit, flush: {}",
flush);
Review Comment:
This reordering changes the write-path backpressure semantics, but there is
no regression test covering the case where a buffer crosses the flush threshold
while the global cache counter is already above `maxBlockedBytes`. Without that
coverage, it will be easy to reintroduce the writer hang described in #614 in a
future refactor.
##########
flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java:
##########
@@ -499,7 +497,7 @@ public void load(String label, BatchRecordBuffer buffer)
throws IOException {
OBJECT_MAPPER.readValue(loadResult,
RespContent.class);
if
(DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
long cacheByteBeforeFlush =
-
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
+
currentCacheBytes.getAndAdd(-buffer.getBufferSizeBytes());
Review Comment:
The counter fix relies on subtracting the buffered byte size instead of
`loadBytes`, but there is no regression test that simulates a successful load
where Doris reports fewer bytes than were buffered (for example with gzip
compression or partial columns). That scenario is the root cause of #614, so it
should be pinned down with a unit test to prevent the deadlock from returning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]