danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645335801
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize,
b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but
flushBucket still failed.");
+ }
Review comment:
Fine, how about we change the message to `The buffer size hits the
threshold {}, but still flush the max size data bucket failed`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]