SteNicholas commented on code in PR #9175: URL: https://github.com/apache/hudi/pull/9175#discussion_r1261384632
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java: ########## @@ -407,16 +408,25 @@ private void startInstant() { private void initInstant(String instant) { HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants(); executor.execute(() -> { + boolean recommit = false; if (instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) || completedTimeline.containsInstant(instant)) { - // the last instant committed successfully + // We don't need to recommit for completed or failed writes reset(); } else { LOG.info("Recommit instant {}", instant); // Recommit should start heartbeat for lazy failed writes clean policy to avoid aborting for heartbeat expired. if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) { writeClient.getHeartbeatClient().start(instant); } - commitInstant(instant); + recommit = commitInstant(instant); Review Comment: ```suggestion recommitted = commitInstant(instant); ``` ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java: ########## @@ -407,16 +408,25 @@ private void startInstant() { private void initInstant(String instant) { HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants(); executor.execute(() -> { + boolean recommit = false; if (instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) || completedTimeline.containsInstant(instant)) { - // the last instant committed successfully + // We don't need to recommit for completed or failed writes reset(); } else { LOG.info("Recommit instant {}", instant); // Recommit should start heartbeat for lazy failed writes clean policy to avoid aborting for heartbeat expired. if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) { writeClient.getHeartbeatClient().start(instant); } - commitInstant(instant); + recommit = commitInstant(instant); + } + if (!recommit) { Review Comment: ```suggestion if (!recommitted) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org