the-other-tim-brown commented on code in PR #13269:
URL: https://github.com/apache/hudi/pull/13269#discussion_r2077504424


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -940,46 +954,53 @@ public String startCommit(String actionType, 
HoodieTableMetaClient metaClient) {
    */
   public void startCommitWithTime(String instantTime) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
-    startCommitWithTime(instantTime, metaClient.getCommitActionType(), 
metaClient);
+    startCommitWithTime(Option.of(instantTime), 
metaClient.getCommitActionType(), metaClient);
   }
 
   /**
    * Completes a new commit time for a write operation 
(insert/update/delete/insert_overwrite/insert_overwrite_table) with specified 
action.
    */
   public void startCommitWithTime(String instantTime, String actionType) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
-    startCommitWithTime(instantTime, actionType, metaClient);
+    startCommitWithTime(Option.of(instantTime), actionType, metaClient);
   }
 
   /**
    * Starts a new commit time for a write operation (insert/update/delete) 
with specified action.
    */
-  private void startCommitWithTime(String instantTime, String actionType, 
HoodieTableMetaClient metaClient) {
+  private String startCommitWithTime(Option<String> providedInstantTime, 
String actionType, HoodieTableMetaClient metaClient) {
     if (needsUpgrade(metaClient)) {
       // unclear what instant to use, since upgrade does have a given instant.
       executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClient, 
Option.empty()));
     }
     CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
         HoodieTimeline.COMMIT_ACTION, () -> 
tableServiceClient.rollbackFailedWrites(metaClient));
 
-    LOG.info("Generate a new instant time: {} action: {}", instantTime, 
actionType);
-    // check there are no inflight restore before starting a new commit.
-    HoodieTimeline inflightRestoreTimeline = 
metaClient.getActiveTimeline().getRestoreTimeline().filterInflightsAndRequested();
-    ValidationUtils.checkArgument(inflightRestoreTimeline.countInstants() == 0,
-        () -> "Found pending restore in active timeline. Please complete the 
restore fully before proceeding. As of now, "
-            + "table could be in an inconsistent state. Pending restores: "
-            + 
Arrays.toString(inflightRestoreTimeline.getInstantsAsStream().map(HoodieInstant::requestedTime).toArray()));
-
-    if (config.getFailedWritesCleanPolicy().isLazy()) {
-      this.heartbeatClient.start(instantTime);
-    }
+    txnManager.beginTransaction(Option.empty(), Option.empty());
+    String instantTime;
+    try {
+      instantTime = providedInstantTime.orElseGet(() -> 
createNewInstantTime(false));
+      LOG.info("Generate a new instant time: {} action: {}", instantTime, 
actionType);
+      // check there are no inflight restore before starting a new commit.
+      HoodieTimeline inflightRestoreTimeline = 
metaClient.reloadActiveTimeline().getRestoreTimeline().filterInflightsAndRequested();
+      ValidationUtils.checkArgument(inflightRestoreTimeline.countInstants() == 
0,
+          () -> "Found pending restore in active timeline. Please complete the 
restore fully before proceeding. As of now, "
+              + "table could be in an inconsistent state. Pending restores: "
+              + 
Arrays.toString(inflightRestoreTimeline.getInstantsAsStream().map(HoodieInstant::requestedTime).toArray()));
+
+      if (config.getFailedWritesCleanPolicy().isLazy()) {
+        this.heartbeatClient.start(instantTime);
+      }
 
-    if (ClusteringUtils.isClusteringOrReplaceCommitAction(actionType)) {
-      
metaClient.getActiveTimeline().createRequestedCommitWithReplaceMetadata(instantTime,
 actionType);
-    } else {
-      
metaClient.getActiveTimeline().createNewInstant(metaClient.createNewInstant(HoodieInstant.State.REQUESTED,
 actionType,
-          instantTime));
+      if (ClusteringUtils.isClusteringOrReplaceCommitAction(actionType)) {
+        
metaClient.getActiveTimeline().createRequestedCommitWithReplaceMetadata(instantTime,
 actionType);
+      } else {
+        
metaClient.getActiveTimeline().createNewInstant(metaClient.createNewInstant(HoodieInstant.State.REQUESTED,
 actionType, instantTime));

Review Comment:
   The requested time is used in the conflict detection code so this can make 
it challenging to debug errors that happening due to conflicts or due to 
failures in conflict detection. If you think about the sequencing of generating 
the timestamp and then writing it across two writers, you end up with 6 
possibilities in the current system because the generation and persistence are 
separate events vs 2 possibilities if we ensure the timeline is generated and 
then written in the same block. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to