danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645239288
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ boolean flush = flushBucket(bucket);
+ if (flush) {
+ this.tracer.countDown(bucket.detector.totalSize);
Review comment:
flush => flushed
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ boolean flush = flushBucket(bucket);
+ if (flush) {
+ this.tracer.countDown(bucket.detector.totalSize);
Review comment:
Can we just inline this flag, make it more concise.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize,
b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but
flushBucket still failed.");
+ }
Review comment:
Already logged in method `flushBucket`.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize,
b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but
flushBucket still failed.");
+ }
Review comment:
Fine, how about we change the message to `The buffer size hits the
threshold {}, but still flush the max size data bucket failed`
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize,
b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but
flushBucket still failed.");
+ }
}
bucket.records.add(item);
}
@SuppressWarnings("unchecked, rawtypes")
- private void flushBucket(DataBucket bucket) {
+ private boolean flushBucket(DataBucket bucket) {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
if (instant == null) {
// in case there are empty checkpoints that has no input data
- LOG.info("No inflight instant when flushing data, cancel.");
- return;
Review comment:
Actually, the current code base should never match this condition now ~
The coordinator would the last instant if there are no write metadata to commit.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize,
b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but
flushBucket still failed.");
+ }
}
bucket.records.add(item);
}
@SuppressWarnings("unchecked, rawtypes")
- private void flushBucket(DataBucket bucket) {
+ private boolean flushBucket(DataBucket bucket) {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
if (instant == null) {
// in case there are empty checkpoints that has no input data
- LOG.info("No inflight instant when flushing data, cancel.");
- return;
Review comment:
But we can still keep it as a protection.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]