This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 005e913403824fc6d5494bbefe8a370712656782 Author: XuQianJin-Stars <forwar...@apache.com> AuthorDate: Thu Nov 24 13:17:21 2022 +0800 [HUDI-5095] Flink: Stores a special watermark(flag) to identify the current progress of writing data --- .../hudi/sink/StreamWriteOperatorCoordinator.java | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 578bb10db5..4a3674ec29 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -511,28 +511,27 @@ public class StreamWriteOperatorCoordinator } setMinEventTime(); doCommit(instant, writeResults); - resetMinEventTime(); return true; } public void setMinEventTime() { if (commitEventTimeEnable) { - LOG.info("[setMinEventTime] receive event time for current commit: {} ", Arrays.stream(eventBuffer).map(WriteMetadataEvent::getMaxEventTime).map(String::valueOf) - .collect(Collectors.joining(", "))); - this.minEventTime = Arrays.stream(eventBuffer) + List<Long> eventTimes = Arrays.stream(eventBuffer) .filter(Objects::nonNull) - .filter(maxEventTime -> maxEventTime.getMaxEventTime() > 0) .map(WriteMetadataEvent::getMaxEventTime) - .min(Comparator.naturalOrder()) - .map(aLong -> Math.min(aLong, this.minEventTime)).orElse(Long.MAX_VALUE); + .filter(maxEventTime -> maxEventTime > 0) + .collect(Collectors.toList()); + + if (!eventTimes.isEmpty()) { + LOG.info("[setMinEventTime] receive event time for current commit: {} ", + eventTimes.stream().map(String::valueOf).collect(Collectors.joining(", "))); + this.minEventTime = eventTimes.stream().min(Comparator.naturalOrder()) + .map(aLong -> Math.min(aLong, this.minEventTime)).orElse(Long.MAX_VALUE); + } LOG.info("[setMinEventTime] minEventTime: {} ", this.minEventTime); } } - public void resetMinEventTime() { - this.minEventTime = Long.MAX_VALUE; - } - /** * Performs the actual commit action. */