danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645335801



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, 
b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but 
flushBucket still failed.");
+      }

Review comment:
       Fine, how about we change the message to `The buffer size hits the 
threshold {}, but still flush the max size data bucket failed`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to