wenbingshen opened a new pull request, #10767: URL: https://github.com/apache/hudi/pull/10767
### Change Logs 1. In Insert mode, when the SubTask is restarted, the OperatorCoordinator is in the notifyCheckpointComplete of CheckpointId-100 for a long time. This may be due to the time-consuming processing of some tableService scanning hdfs, or the time-consuming hdfs execution encountered during Rollback and initInstant. 2. At this time, ckp-meta/instantId.INFLIGHT is not completed, but the corresponding commit file has been submitted. At this time, the bootstrap event will be sent when the subTask restarts. 3. After the OperatorCoordinator completes processing the notifyCheckpointComplete, it will create a new Instant, and the subTask will create the corresponding parquet file, etc. based on the Instant. 4. OperatorCoordinator then processes the bootstrap event, creates another new Instant, and rolls back the Instant created in the third step. This causes OperatorCoordinator and Operator to begin to be inconsistent. This is related to Hudi's three-stage submission, including data snapshot, submit commit file, and submit ckp_meta file One solution is as shown in this PR submission. Another possible solution is that in Insert mode, when the Task is restarted, if the commit has been submitted but ckp_meta has not been submitted, the bootstrap event is not sent: ```java protected void sendBootstrapEvent() { int attemptId = getRuntimeContext().getAttemptNumber(); if (attemptId > 0) { // either a partial or global failover, reuses the current inflight instant if (this.currentInstant != null && !metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(currentInstant)) { LOG.info("Recover task[{}] for instant [{}] with attemptId [{}]", taskID, this.currentInstant, attemptId); this.currentInstant = null; return; } else if (this.currentInstant != null) { // <== ****** Add this judgment ****** LOG.info("Recover task[{}] when coordinator committing for instant [{}] is in progress with attemptId [{}]." + " wait for this currentInstant commit complete.", taskID, this.currentInstant, attemptId); return; } // the JM may have also been rebooted, sends the bootstrap event either } this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID)); LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); } ``` ### Impact Flink append mode. ### Risk level (write none, low medium or high below) high ### Documentation Update _Describe any necessary documentation update if there is any new feature, config, or user-facing change_ - _The config description must be updated if new configs are added or the default value of the configs are changed_ - _Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the ticket number here and follow the [instruction](https://hudi.apache.org/contribute/developer-setup#website) to make changes to the website._ ### Contributor's checklist - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [ ] Change Logs and Impact were stated clearly - [ ] Adequate tests were added if applicable - [ ] CI passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org