ksmou commented on code in PR #10230:
URL: https://github.com/apache/hudi/pull/10230#discussion_r1413331856


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java:
##########
@@ -185,6 +188,41 @@ public void testRecommitWithPartialUncommittedEvents() {
     assertThat("Recommits the instant with partial uncommitted events", 
lastCompleted, is(instant));
   }
 
+  @Test
+  public void testStopHeartbeatForUncommittedEventWithLazyCleanPolicy() throws 
Exception {
+    // reset
+    reset();
+    // override the default configuration
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    OperatorCoordinator.Context context = new 
MockOperatorCoordinatorContext(new OperatorID(), 1);
+    coordinator = new StreamWriteOperatorCoordinator(conf, context);
+    coordinator.start();
+    coordinator.setExecutor(new MockCoordinatorExecutor(context));
+
+    coordinator.getWriteClient().getConfig()
+        .setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY, 
HoodieFailedWritesCleaningPolicy.LAZY.name());
+    
assertTrue(coordinator.getWriteClient().getConfig().getFailedWritesCleanPolicy().isLazy());
+
+    final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
+
+    // start one instant and not commit it
+    coordinator.handleEventFromOperator(0, event0);
+    String instant = coordinator.getInstant();
+    HoodieHeartbeatClient heartbeatClient = 
coordinator.getWriteClient().getHeartbeatClient();
+    assertNotEquals(null, heartbeatClient.getHeartbeat(instant), "Heartbeat 
should not null");
+
+    String basePath = tempFile.getAbsolutePath();
+    HoodieWrapperFileSystem fs = 
coordinator.getWriteClient().getHoodieTable().getMetaClient().getFs();
+
+    assertTrue(HoodieHeartbeatClient.heartbeatExists(fs, basePath, instant), 
"Heartbeat is existed");
+
+    // send bootstrap event to stop the heartbeat for this instant
+    WriteMetadataEvent event1 = WriteMetadataEvent.emptyBootstrap(0);
+    coordinator.handleEventFromOperator(0, event1);
+
+    assertFalse(HoodieHeartbeatClient.heartbeatExists(fs, basePath, instant), 
"Heartbeat is stopped and not existed");
+  }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to