danny0405 commented on code in PR #10230: URL: https://github.com/apache/hudi/pull/10230#discussion_r1413310745
########## hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java: ########## @@ -185,6 +188,41 @@ public void testRecommitWithPartialUncommittedEvents() { assertThat("Recommits the instant with partial uncommitted events", lastCompleted, is(instant)); } + @Test + public void testStopHeartbeatForUncommittedEventWithLazyCleanPolicy() throws Exception { + // reset + reset(); + // override the default configuration + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1); + coordinator = new StreamWriteOperatorCoordinator(conf, context); + coordinator.start(); + coordinator.setExecutor(new MockCoordinatorExecutor(context)); + + coordinator.getWriteClient().getConfig() + .setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY, HoodieFailedWritesCleaningPolicy.LAZY.name()); + assertTrue(coordinator.getWriteClient().getConfig().getFailedWritesCleanPolicy().isLazy()); + + final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0); + + // start one instant and not commit it + coordinator.handleEventFromOperator(0, event0); + String instant = coordinator.getInstant(); + HoodieHeartbeatClient heartbeatClient = coordinator.getWriteClient().getHeartbeatClient(); + assertNotEquals(null, heartbeatClient.getHeartbeat(instant), "Heartbeat should not null"); + + String basePath = tempFile.getAbsolutePath(); + HoodieWrapperFileSystem fs = coordinator.getWriteClient().getHoodieTable().getMetaClient().getFs(); + + assertTrue(HoodieHeartbeatClient.heartbeatExists(fs, basePath, instant), "Heartbeat is existed"); + + // send bootstrap event to stop the heartbeat for this instant + WriteMetadataEvent event1 = WriteMetadataEvent.emptyBootstrap(0); + coordinator.handleEventFromOperator(0, event1); + + assertFalse(HoodieHeartbeatClient.heartbeatExists(fs, basePath, instant), "Heartbeat is stopped and not existed"); + } Review Comment: `Heartbeat is stopped and not existed` -> `Heartbeat is stopped and cleared` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org