This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c265ddcc4b5bb307de319cb303ac249c85f787c2 Author: Nicholas Jiang <programg...@163.com> AuthorDate: Fri Jan 6 20:32:42 2023 +0800 [HUDI-5506] StreamWriteOperatorCoordinator may not recommit with partial uncommitted write metadata event (#7611) (cherry picked from commit 3e49e4c26dae1080e2b1a9389a75f56464c167a5) --- .../apache/hudi/sink/StreamWriteOperatorCoordinator.java | 8 ++++++-- .../hudi/sink/TestStreamWriteOperatorCoordinator.java | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index f4acc2e83ad..8a913bf4298 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -397,7 +397,7 @@ public class StreamWriteOperatorCoordinator HoodieTimeline completedTimeline = StreamerUtil.createMetaClient(conf).getActiveTimeline().filterCompletedInstants(); executor.execute(() -> { - if (instant.equals("") || completedTimeline.containsInstant(instant)) { + if (instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) || completedTimeline.containsInstant(instant)) { // the last instant committed successfully reset(); } else { @@ -415,7 +415,11 @@ public class StreamWriteOperatorCoordinator this.eventBuffer[event.getTaskID()] = event; if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap())) { // start to initialize the instant. - initInstant(event.getInstantTime()); + final String instant = Arrays.stream(eventBuffer) + .filter(evt -> evt.getWriteStatuses().size() > 0) + .findFirst().map(WriteMetadataEvent::getInstantTime) + .orElse(WriteMetadataEvent.BOOTSTRAP_INSTANT); + initInstant(instant); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index d5d35f7494f..5e712363ac9 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -164,6 +164,22 @@ public class TestStreamWriteOperatorCoordinator { assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant)); } + @Test + public void testRecommitWithPartialUncommittedEvents() { + final CompletableFuture<byte[]> future = new CompletableFuture<>(); + coordinator.checkpointCoordinator(1, future); + String instant = coordinator.getInstant(); + String lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); + assertNull(lastCompleted, "Returns early for empty write results"); + WriteMetadataEvent event1 = createOperatorEvent(0, instant, "par1", false, 0.2); + event1.setBootstrap(true); + WriteMetadataEvent event2 = WriteMetadataEvent.emptyBootstrap(1); + coordinator.handleEventFromOperator(0, event1); + coordinator.handleEventFromOperator(1, event2); + lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); + assertThat("Recommits the instant with partial uncommitted events", lastCompleted, is(instant)); + } + @Test public void testHiveSyncInvoked() throws Exception { // reset