danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645336213



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, 
b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but 
flushBucket still failed.");
+      }
     }
     bucket.records.add(item);
   }
 
   @SuppressWarnings("unchecked, rawtypes")
-  private void flushBucket(DataBucket bucket) {
+  private boolean flushBucket(DataBucket bucket) {
     String instant = this.writeClient.getLastPendingInstant(this.actionType);
 
     if (instant == null) {
       // in case there are empty checkpoints that has no input data
-      LOG.info("No inflight instant when flushing data, cancel.");
-      return;

Review comment:
       Actually, the current code base should never match this condition now ~ 
The coordinator would the last instant if there are no write metadata to commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to