wenbingshen opened a new pull request, #10767:
URL: https://github.com/apache/hudi/pull/10767

   ### Change Logs
   
   1. In Insert mode, when the SubTask is restarted, the OperatorCoordinator is 
in the notifyCheckpointComplete of CheckpointId-100 for a long time. This may 
be due to the time-consuming processing of some tableService scanning hdfs, or 
the time-consuming hdfs execution encountered during Rollback and initInstant.
   2. At this time, ckp-meta/instantId.INFLIGHT is not completed, but the 
corresponding commit file has been submitted. At this time, the bootstrap event 
will be sent when the subTask restarts.
   3. After the OperatorCoordinator completes processing the 
notifyCheckpointComplete, it will create a new Instant, and the subTask will 
create the corresponding parquet file, etc. based on the Instant.
   4. OperatorCoordinator then processes the bootstrap event, creates another 
new Instant, and rolls back the Instant created in the third step. This causes 
OperatorCoordinator and Operator to begin to be inconsistent.
   
   This is related to Hudi's three-stage submission, including data snapshot, 
submit commit file, and submit ckp_meta file
   
   One solution is as shown in this PR submission. Another possible solution is 
that in Insert mode, when the Task is restarted, if the commit has been 
submitted but ckp_meta has not been submitted, the bootstrap event is not sent:
   ```java
   protected void sendBootstrapEvent() {
       int attemptId = getRuntimeContext().getAttemptNumber();
       if (attemptId > 0) {
         // either a partial or global failover, reuses the current inflight 
instant
         if (this.currentInstant != null && 
!metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(currentInstant))
 {
           LOG.info("Recover task[{}] for instant [{}] with attemptId [{}]", 
taskID, this.currentInstant, attemptId);
           this.currentInstant = null;
           return;
         } else if (this.currentInstant != null) { // <== ****** Add this 
judgment ******
           LOG.info("Recover task[{}] when coordinator committing for instant 
[{}] is in progress with attemptId [{}]."
                   + " wait for this currentInstant commit complete.", taskID, 
this.currentInstant, attemptId);
           return;
         }
         // the JM may have also been rebooted, sends the bootstrap event either
       }
       
this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
       LOG.info("Send bootstrap write metadata event to coordinator, 
task[{}].", taskID);
     }
   ```
   
   ### Impact
   
   Flink append mode.
   
   ### Risk level (write none, low medium or high below)
   
   high
   
   ### Documentation Update
   
   _Describe any necessary documentation update if there is any new feature, 
config, or user-facing change_
   
   - _The config description must be updated if new configs are added or the 
default value of the configs are changed_
   - _Any new feature or user-facing change requires updating the Hudi website. 
Please create a Jira ticket, attach the
     ticket number here and follow the 
[instruction](https://hudi.apache.org/contribute/developer-setup#website) to 
make
     changes to the website._
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to