This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c265ddcc4b5bb307de319cb303ac249c85f787c2
Author: Nicholas Jiang <programg...@163.com>
AuthorDate: Fri Jan 6 20:32:42 2023 +0800

    [HUDI-5506] StreamWriteOperatorCoordinator may not recommit with partial 
uncommitted write metadata event (#7611)
    
    (cherry picked from commit 3e49e4c26dae1080e2b1a9389a75f56464c167a5)
---
 .../apache/hudi/sink/StreamWriteOperatorCoordinator.java |  8 ++++++--
 .../hudi/sink/TestStreamWriteOperatorCoordinator.java    | 16 ++++++++++++++++
 2 files changed, 22 insertions(+), 2 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index f4acc2e83ad..8a913bf4298 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -397,7 +397,7 @@ public class StreamWriteOperatorCoordinator
     HoodieTimeline completedTimeline =
         
StreamerUtil.createMetaClient(conf).getActiveTimeline().filterCompletedInstants();
     executor.execute(() -> {
-      if (instant.equals("") || completedTimeline.containsInstant(instant)) {
+      if (instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) || 
completedTimeline.containsInstant(instant)) {
         // the last instant committed successfully
         reset();
       } else {
@@ -415,7 +415,11 @@ public class StreamWriteOperatorCoordinator
     this.eventBuffer[event.getTaskID()] = event;
     if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && 
evt.isBootstrap())) {
       // start to initialize the instant.
-      initInstant(event.getInstantTime());
+      final String instant = Arrays.stream(eventBuffer)
+          .filter(evt -> evt.getWriteStatuses().size() > 0)
+          .findFirst().map(WriteMetadataEvent::getInstantTime)
+          .orElse(WriteMetadataEvent.BOOTSTRAP_INSTANT);
+      initInstant(instant);
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index d5d35f7494f..5e712363ac9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -164,6 +164,22 @@ public class TestStreamWriteOperatorCoordinator {
     assertThat("Commits the instant with partial events anyway", 
lastCompleted, is(instant));
   }
 
+  @Test
+  public void testRecommitWithPartialUncommittedEvents() {
+    final CompletableFuture<byte[]> future = new CompletableFuture<>();
+    coordinator.checkpointCoordinator(1, future);
+    String instant = coordinator.getInstant();
+    String lastCompleted = 
TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+    assertNull(lastCompleted, "Returns early for empty write results");
+    WriteMetadataEvent event1 = createOperatorEvent(0, instant, "par1", false, 
0.2);
+    event1.setBootstrap(true);
+    WriteMetadataEvent event2 = WriteMetadataEvent.emptyBootstrap(1);
+    coordinator.handleEventFromOperator(0, event1);
+    coordinator.handleEventFromOperator(1, event2);
+    lastCompleted = 
TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+    assertThat("Recommits the instant with partial uncommitted events", 
lastCompleted, is(instant));
+  }
+
   @Test
   public void testHiveSyncInvoked() throws Exception {
     // reset

Reply via email to