vinothchandar commented on code in PR #9617:
URL: https://github.com/apache/hudi/pull/9617#discussion_r1342547848


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java:
##########
@@ -261,7 +261,10 @@ protected void finishRollback(HoodieInstant 
inflightInstant, HoodieRollbackMetad
       // If publish the rollback to the timeline, we finally transition the 
inflight rollback
       // to complete in the data table timeline
       if (!skipTimelinePublish) {
-        
table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant,
+        // NOTE: no need to lock here, since !skipTimelinePublish is always 
true,
+        // when skipLocking is false, txnManager above-mentioned should lock 
it.
+        // when skipLocking is true, the caller should have already held the 
lock.

Review Comment:
   esp stuff like this, it's better to have re-entrancy here as opposed to 
avoiding locks, so we don't introduce code deps on locking, which is hard to 
debug.



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java:
##########
@@ -137,6 +137,7 @@ protected void completeClustering(
 
       LOG.info("Committing Clustering {} finished with result {}.", 
clusteringCommitTime, metadata);
       table.getActiveTimeline().transitionReplaceInflightToComplete(
+          false,

Review Comment:
   why do we skip locking here completely.



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import org.apache.hudi.common.table.timeline.TimeGeneratorType;
+import org.apache.hudi.common.util.Option;
+
+import java.util.Properties;
+
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
+
+/**
+ * Configuration for Hoodie time generation.
+ */
+public class HoodieTimeGeneratorConfig extends HoodieConfig {
+
+  public static final String BASE_PATH = "hoodie.base.path";
+  private static final String LOCK_PROVIDER_KEY = LOCK_PREFIX + "provider";
+  private static final String DEFAULT_LOCK_PROVIDER = 
"org.apache.hudi.client.transaction.lock.InProcessLockProvider";
+
+  public static final ConfigProperty<String> TIME_GENERATOR_TYPE = 
ConfigProperty
+      .key("hoodie.time.generator.type")
+      .defaultValue(TimeGeneratorType.LOCK_PROVIDER.name())
+      .withValidValues(TimeGeneratorType.LOCK_PROVIDER.name())
+      .sinceVersion("1.0.0")
+      .markAdvanced()
+      .withDocumentation("Time generator type, which is used to generate 
globally monotonically increasing timestamp");
+
+  public static final ConfigProperty<Long> MAX_EXPECTED_CLOCK_SKEW_MS = 
ConfigProperty
+      .key("hoodie.time.generator.max_expected_clock_skew_ms")
+      .defaultValue(20L)

Review Comment:
   I'd say something like 200ms is a safer default.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGeneratorType.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.common.config.EnumDescription;
+import org.apache.hudi.common.config.EnumFieldDescription;
+
+/**
+ * Types of {@link TimeGenerator}.
+ */
+@EnumDescription("Time generator type, indicating the time generator class to 
use, that implements "
+    + "`org.apache.hudi.common.table.timeline.TimeGenerator`.")
+public enum TimeGeneratorType {
+
+  @EnumFieldDescription("A wait based time generator, holding a mutex lock 
with skew times to "
+      + "produce globally monotonically increasing timestamp")
+  LOCK_PROVIDER

Review Comment:
   rename: WAIT_BASED ? Calling it `LOCK_PROVIDER` is a bit confusing IMO, 
since we have other meanings for this in the code/docs. 
   In fact, call this sth like - `WAIT_TO_ADJUST_SKEW` to make it more explicit?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java:
##########
@@ -573,54 +624,85 @@ public HoodieInstant 
transitionReplaceRequestedToInflight(HoodieInstant requeste
     ValidationUtils.checkArgument(requestedInstant.isRequested());
     HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, 
REPLACE_COMMIT_ACTION, requestedInstant.getTimestamp());
     // Then write to timeline
-    transitionState(requestedInstant, inflightInstant, data);
+    transitionPendingState(requestedInstant, inflightInstant, data);
     return inflightInstant;
   }
 
   /**
    * Transition replace inflight to Committed.
    *
+   * @param shouldLock Whether to hold the lock when performing transition
    * @param inflightInstant Inflight instant
    * @param data Extra Metadata
    * @return commit instant
    */
-  public HoodieInstant transitionReplaceInflightToComplete(HoodieInstant 
inflightInstant, Option<byte[]> data) {
+  public HoodieInstant transitionReplaceInflightToComplete(boolean shouldLock,
+                                                           HoodieInstant 
inflightInstant, Option<byte[]> data) {
     
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
     ValidationUtils.checkArgument(inflightInstant.isInflight());
     HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, 
REPLACE_COMMIT_ACTION, inflightInstant.getTimestamp());
     // Then write to timeline
-    transitionState(inflightInstant, commitInstant, data);
+    transitionStateToComplete(shouldLock, inflightInstant, commitInstant, 
data);
     return commitInstant;
   }
 
-  private void transitionState(HoodieInstant fromInstant, HoodieInstant 
toInstant, Option<byte[]> data) {
-    transitionState(fromInstant, toInstant, data, false);
+  private void transitionPendingState(HoodieInstant fromInstant, HoodieInstant 
toInstant, Option<byte[]> data) {
+    transitionPendingState(fromInstant, toInstant, data, false);
+  }
+
+  protected void transitionStateToComplete(boolean shouldLock, HoodieInstant 
fromInstant,
+                                           HoodieInstant toInstant, 
Option<byte[]> data) {
+    
ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()),
 String.format("%s and %s are not consistent when transition state.", 
fromInstant, toInstant));
+    String fromInstantFileName = fromInstant.getFileName();
+    // Ensures old state exists in timeline
+    LOG.info("Checking for file exists ?" + 
getInstantFileNamePath(fromInstantFileName));
+    try {
+      if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
+        // Re-create the .inflight file by opening a new file and write the 
commit metadata in
+        createFileInMetaPath(fromInstantFileName, data, false);
+        Path fromInstantPath = getInstantFileNamePath(fromInstantFileName);
+        HoodieInstant instantWithCompletionTime = new 
HoodieInstant(toInstant.getState(), toInstant.getAction(),
+            toInstant.getTimestamp(), metaClient.createNewInstantTime(false));
+        Path toInstantPath = 
getInstantFileNamePath(instantWithCompletionTime.getFileName());
+        boolean success = metaClient.getFs().rename(fromInstantPath, 
toInstantPath);

Review Comment:
   we should be avoiding renames, for cloud storage? I think we should make 
higher level APIs like 
   
   HoodieStorage.writeFileAtomic(..)  and implement differently for hdfs and 
cloud storage say.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -370,6 +377,28 @@ public synchronized HoodieActiveTimeline 
reloadActiveTimeline() {
     return activeTimeline;
   }
 
+  /**
+   * Returns next instant time in the correct format. Lock is enabled by 
default.
+   */
+  public String createNewInstantTime() {
+    return createNewInstantTime(true);
+  }
+
+  /**
+   * Returns next instant time in the correct format.
+   *
+   * @param shouldLock whether the lock should be enabled to get the instant 
time.
+   */
+  public String createNewInstantTime(boolean shouldLock) {

Review Comment:
   My worry is all the places we pass `true` `false` and how it may make it 
hard to maintain . 



##########
packaging/hudi-kafka-connect-bundle/pom.xml:
##########
@@ -107,6 +107,7 @@
                                     <include>com.lmax:disruptor</include>
                                     
<include>com.github.davidmoten:guava-mini</include>
                                     
<include>com.github.davidmoten:hilbert-curve</include>
+                                    
<include>com.github.ben-manes.caffeine:caffeine</include>

Review Comment:
   separate PR?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -964,7 +976,7 @@ public void update(HoodieRestoreMetadata restoreMetadata, 
String instantTime) {
       // We cannot create a deltaCommit at instantTime now because a future 
(rollback) block has already been written to the logFiles.
       // We need to choose a timestamp which would be a validInstantTime for 
MDT. This is either a commit timestamp completed on the dataset
       // or a timestamp with suffix which we use for MDT clean, compaction etc.
-      String syncCommitTime = 
HoodieTableMetadataUtil.createRestoreTimestamp(HoodieActiveTimeline.createNewInstantTime());
+      String syncCommitTime = 
HoodieTableMetadataUtil.createRestoreTimestamp(writeClient.createNewInstantTime(false));

Review Comment:
   should we uniformly use/not-use locking across different invocations



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import org.apache.hudi.common.table.timeline.TimeGeneratorType;
+import org.apache.hudi.common.util.Option;
+
+import java.util.Properties;
+
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
+
+/**
+ * Configuration for Hoodie time generation.
+ */
+public class HoodieTimeGeneratorConfig extends HoodieConfig {
+
+  public static final String BASE_PATH = "hoodie.base.path";
+  private static final String LOCK_PROVIDER_KEY = LOCK_PREFIX + "provider";
+  private static final String DEFAULT_LOCK_PROVIDER = 
"org.apache.hudi.client.transaction.lock.InProcessLockProvider";
+
+  public static final ConfigProperty<String> TIME_GENERATOR_TYPE = 
ConfigProperty
+      .key("hoodie.time.generator.type")
+      .defaultValue(TimeGeneratorType.LOCK_PROVIDER.name())
+      .withValidValues(TimeGeneratorType.LOCK_PROVIDER.name())
+      .sinceVersion("1.0.0")
+      .markAdvanced()
+      .withDocumentation("Time generator type, which is used to generate 
globally monotonically increasing timestamp");
+
+  public static final ConfigProperty<Long> MAX_EXPECTED_CLOCK_SKEW_MS = 
ConfigProperty
+      .key("hoodie.time.generator.max_expected_clock_skew_ms")
+      .defaultValue(20L)
+      .withInferFunction(cfg -> {
+        if (DEFAULT_LOCK_PROVIDER.equals(cfg.getString(LOCK_PROVIDER_KEY))) {
+          return Option.of(1L);
+        }
+        return Option.empty();
+      })
+      .sinceVersion("1.0.0")
+      .markAdvanced()
+      .withDocumentation("The max expected clock skew time for 
WaitBasedTimeGenerator");

Review Comment:
   The max expected clock skew time for WaitBasedTimeGenerator **in ms** ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGeneratorBase.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_NUM_RETRIES;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_WAIT_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * Base time generator facility that maintains lock-related utilities.
+ */
+public abstract class TimeGeneratorBase implements TimeGenerator, Serializable 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TimeGeneratorBase.class);
+
+  /**
+   * The lock provider.
+   */
+  private volatile LockProvider lockProvider;
+  /**
+   * The maximum times to retry in case there are failures.
+   */
+  private final int maxRetries;
+  /**
+   * The maximum time to wait for each time generation to resolve the clock 
skew issue on distributed hosts.
+   */
+  private final long maxWaitTimeInMs;
+  /**
+   * The maximum time to block for acquiring a lock.
+   */
+  private final int lockAcquireWaitTimeInMs;
+
+  protected final HoodieTimeGeneratorConfig config;
+  private final LockConfiguration lockConfiguration;
+
+  /**
+   * The hadoop configuration.
+   */
+  private final SerializableConfiguration hadoopConf;
+
+  public TimeGeneratorBase(HoodieTimeGeneratorConfig config, 
SerializableConfiguration hadoopConf) {
+    this.config = config;
+    this.lockConfiguration = config.getLockConfiguration();
+    this.hadoopConf = hadoopConf;
+
+    maxRetries = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
+        Integer.parseInt(DEFAULT_LOCK_ACQUIRE_NUM_RETRIES));
+    lockAcquireWaitTimeInMs = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
+        Integer.parseInt(DEFAULT_LOCK_ACQUIRE_WAIT_TIMEOUT_MS));
+    maxWaitTimeInMs = 
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
+        Long.parseLong(DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS));
+  }
+
+  protected LockProvider getLockProvider() {
+    // Perform lazy initialization of lock provider only if needed
+    if (lockProvider == null) {
+      synchronized (this) {
+        if (lockProvider == null) {
+          String lockProviderClass = 
lockConfiguration.getConfig().getString("hoodie.write.lock.provider");
+          LOG.info("LockProvider for TimeGenerator: " + lockProviderClass);
+          lockProvider = (LockProvider) 
ReflectionUtils.loadClass(lockProviderClass,
+              lockConfiguration, hadoopConf.get());
+        }
+      }
+    }
+    return lockProvider;
+  }
+
+  public void lock() {

Review Comment:
   is there a common place to move this logic to? I guess this would be common 
to all locking code, right>?



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import org.apache.hudi.common.table.timeline.TimeGeneratorType;
+import org.apache.hudi.common.util.Option;
+
+import java.util.Properties;
+
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
+
+/**
+ * Configuration for Hoodie time generation.
+ */
+public class HoodieTimeGeneratorConfig extends HoodieConfig {
+
+  public static final String BASE_PATH = "hoodie.base.path";

Review Comment:
   move this to `HoodieCommonConfig` and reference in `HoodieWriteConfig` and 
here?



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -187,7 +188,11 @@ public static String maskWithoutFileId(String instantTime, 
int taskPartitionId)
   }
 
   public static String getCommitFromCommitFile(String commitFileName) {
-    return commitFileName.split("\\.")[0];
+    try {
+      return HoodieInstant.extractTimestamp(commitFileName);
+    } catch (IllegalArgumentException e) {
+      return "";

Review Comment:
   should we not throw e?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGenerators.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import com.github.benmanes.caffeine.cache.Cache;

Review Comment:
   is this a new dep?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -370,6 +377,28 @@ public synchronized HoodieActiveTimeline 
reloadActiveTimeline() {
     return activeTimeline;
   }
 
+  /**
+   * Returns next instant time in the correct format. Lock is enabled by 
default.
+   */
+  public String createNewInstantTime() {
+    return createNewInstantTime(true);
+  }
+
+  /**
+   * Returns next instant time in the correct format.
+   *
+   * @param shouldLock whether the lock should be enabled to get the instant 
time.
+   */
+  public String createNewInstantTime(boolean shouldLock) {

Review Comment:
   I am thinking if we can simplify all the `shouldLock` passing somehow. Would 
it be possible to just have the lock configuration passed down and let the 
`TimeGenerator` decide? Default can be inprocess lock provider?



##########
hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java:
##########
@@ -35,19 +35,23 @@ public class LockConfiguration implements Serializable {
   public static final String 
LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY = LOCK_PREFIX + 
"max_wait_time_ms_between_retry";
 
   public static final String 
LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY = LOCK_PREFIX + 
"client.wait_time_ms_between_retry";
+  public static final String 
DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(5000L);

Review Comment:
   we probably need to turn these into ConfigProperty? (idk if there was a 
reason we could nt do this.) @yihua may know.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java:
##########
@@ -447,37 +490,41 @@ public HoodieInstant 
transitionLogCompactionRequestedToInflight(HoodieInstant re
     ValidationUtils.checkArgument(requestedInstant.isRequested());
     HoodieInstant inflightInstant =
         new HoodieInstant(State.INFLIGHT, LOG_COMPACTION_ACTION, 
requestedInstant.getTimestamp());
-    transitionState(requestedInstant, inflightInstant, Option.empty());
+    transitionPendingState(requestedInstant, inflightInstant, Option.empty());
     return inflightInstant;
   }
 
   /**
    * Transition Compaction State from inflight to Committed.
    *
+   * @param shouldLock Whether to hold the lock when performing transition
    * @param inflightInstant Inflight instant
    * @param data Extra Metadata
    * @return commit instant
    */
-  public HoodieInstant transitionCompactionInflightToComplete(HoodieInstant 
inflightInstant, Option<byte[]> data) {
+  public HoodieInstant transitionCompactionInflightToComplete(boolean 
shouldLock, HoodieInstant inflightInstant,
+                                                              Option<byte[]> 
data) {
     
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
     ValidationUtils.checkArgument(inflightInstant.isInflight());
     HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, 
COMMIT_ACTION, inflightInstant.getTimestamp());
-    transitionState(inflightInstant, commitInstant, data);
+    transitionStateToComplete(shouldLock, inflightInstant, commitInstant, 
data);
     return commitInstant;
   }
 
   /**
    * Transition Log Compaction State from inflight to Committed.
    *
+   * @param shouldLock Whether to hold the lock when performing transition
    * @param inflightInstant Inflight instant
    * @param data Extra Metadata
    * @return commit instant
    */
-  public HoodieInstant transitionLogCompactionInflightToComplete(HoodieInstant 
inflightInstant, Option<byte[]> data) {
+  public HoodieInstant transitionLogCompactionInflightToComplete(boolean 
shouldLock,
+                                                                 HoodieInstant 
inflightInstant, Option<byte[]> data) {
     
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.LOG_COMPACTION_ACTION));
     ValidationUtils.checkArgument(inflightInstant.isInflight());
     HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, 
DELTA_COMMIT_ACTION, inflightInstant.getTimestamp());
-    transitionState(inflightInstant, commitInstant, data);
+    transitionStateToComplete(shouldLock, inflightInstant, commitInstant, 
data);

Review Comment:
   this special handling completed instant is because we only generate 
completion time for those?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java:
##########
@@ -198,8 +204,23 @@ private void readObject(java.io.ObjectInputStream in) 
throws IOException, ClassN
     in.defaultReadObject();
   }
 
+  /**
+   * Create a complete instant and save to storage with a completion time.
+   * @param shouldLock whether the lock should be enabled.
+   * @param instant the complete instant.
+   */
+  public void createCompleteInstant(boolean shouldLock, HoodieInstant instant) 
{

Review Comment:
   what are the cases where `shouldLock` is true vs false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to