This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b8b2241d17 [HUDI-7552] Remove the suffix for MDT table service 
instants (#10945)
8b8b2241d17 is described below

commit 8b8b2241d17f4d8b87253c006c79a2961d4668fe
Author: Danny Chan <[email protected]>
AuthorDate: Thu Apr 11 09:22:42 2024 +0800

    [HUDI-7552] Remove the suffix for MDT table service instants (#10945)
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |  24 ++++-
 .../metadata/HoodieBackedTableMetadataWriter.java  |  36 +++----
 .../hudi/metadata/HoodieTableMetadataWriter.java   |   3 +-
 .../table/action/index/RunIndexActionExecutor.java |   2 +-
 .../FlinkHoodieBackedTableMetadataWriter.java      |   2 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 104 ++++-----------------
 .../sink/TestStreamWriteOperatorCoordinator.java   |   4 +-
 .../apache/hudi/utilities/TestHoodieIndexer.java   |   2 +-
 8 files changed, 62 insertions(+), 115 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index c472be33f3d..143da3e6084 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -57,6 +57,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieLogCompactException;
 import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -885,7 +886,6 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
 
   /**
    * Rolls back the failed delta commits corresponding to the indexing action.
-   * Such delta commits are identified based on the suffix 
`METADATA_INDEXER_TIME_SUFFIX` ("004").
    * <p>
    * TODO(HUDI-5733): This should be cleaned up once the proper fix of 
rollbacks
    *  in the metadata table is landed.
@@ -894,17 +894,26 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
    */
   protected boolean rollbackFailedIndexingCommits() {
     HoodieTable table = createTable(config, hadoopConf);
-    List<String> instantsToRollback = 
getFailedIndexingCommitsToRollback(table.getMetaClient());
+    List<String> instantsToRollback = 
getFailedIndexingCommitsToRollbackForMetadataTable(table.getMetaClient());
     Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = 
getPendingRollbackInfos(table.getMetaClient());
     instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, 
Option.empty()));
     rollbackFailedWrites(pendingRollbacks);
     return !pendingRollbacks.isEmpty();
   }
 
-  protected List<String> 
getFailedIndexingCommitsToRollback(HoodieTableMetaClient metaClient) {
+  private List<String> 
getFailedIndexingCommitsToRollbackForMetadataTable(HoodieTableMetaClient 
metaClient) {
+    if (!isMetadataTable(metaClient.getBasePathV2().toString())) {
+      return Collections.emptyList();
+    }
+    HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
+        
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
+        .setConf(metaClient.getHadoopConf())
+        .build();
+    HoodieTimeline dataIndexTimeline = 
dataMetaClient.getActiveTimeline().filter(instant -> 
instant.getAction().equals(HoodieTimeline.INDEXING_ACTION));
+
     Stream<HoodieInstant> inflightInstantsStream = 
metaClient.getCommitsTimeline()
         .filter(instant -> !instant.isCompleted()
-            && isIndexingCommit(instant.getTimestamp()))
+            && isIndexingCommit(dataIndexTimeline, instant.getTimestamp()))
         .getInstantsAsStream();
     return inflightInstantsStream.filter(instant -> {
       try {
@@ -963,11 +972,16 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
       // TODO(HUDI-5733): This should be cleaned up once the proper fix of 
rollbacks in the
       //  metadata table is landed.
       if (metaClient.isMetadataTable()) {
+        HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
+            
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
+            .setConf(metaClient.getHadoopConf())
+            .build();
+        HoodieTimeline dataIndexTimeline = 
dataMetaClient.getActiveTimeline().filter(instant -> 
instant.getAction().equals(HoodieTimeline.INDEXING_ACTION));
         return 
inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
           if (curInstantTime.isPresent()) {
             return !entry.equals(curInstantTime.get());
           } else {
-            return !isIndexingCommit(entry);
+            return !isIndexingCommit(dataIndexTimeline, entry);
           }
         }).collect(Collectors.toList());
       }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index ca9c72ac601..891cc88b9da 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -106,7 +106,6 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInfli
 import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.createRollbackTimestamp;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForFunctionalIndex;
@@ -128,8 +127,6 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class);
 
-  public static final String METADATA_COMPACTION_TIME_SUFFIX = "001";
-
   // Virtual keys support for metadata table. This Field is
   // from the metadata payload schema.
   private static final String RECORD_KEY_FIELD_NAME = 
HoodieMetadataPayload.KEY_FIELD_NAME;
@@ -487,8 +484,9 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * @return a unique timestamp for MDT
    */
   private String generateUniqueCommitInstantTime(String initializationTime) {
-    // if its initialized via Async indexer, we don't need to alter the init 
time
-    if (HoodieTableMetadataUtil.isIndexingCommit(initializationTime)) {
+    // if it's initialized via Async indexer, we don't need to alter the init 
time
+    HoodieTimeline dataIndexTimeline = 
dataMetaClient.getActiveTimeline().filter(instant -> 
instant.getAction().equals(HoodieTimeline.INDEXING_ACTION));
+    if (HoodieTableMetadataUtil.isIndexingCommit(dataIndexTimeline, 
initializationTime)) {
       return initializationTime;
     }
     // Add suffix to initializationTime to find an unused instant time for the 
next index initialization.
@@ -905,7 +903,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     return 
getEnabledPartitionTypes().stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
   }
 
-  public void buildMetadataPartitions(HoodieEngineContext engineContext, 
List<HoodieIndexPartitionInfo> indexPartitionInfos) throws IOException {
+  public void buildMetadataPartitions(HoodieEngineContext engineContext, 
List<HoodieIndexPartitionInfo> indexPartitionInfos, String instantTime) throws 
IOException {
     if (indexPartitionInfos.isEmpty()) {
       LOG.warn("No partition to index in the plan");
       return;
@@ -932,7 +930,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     
dataMetaClient.getTableConfig().setMetadataPartitionsInflight(dataMetaClient, 
partitionPaths);
 
     // initialize partitions
-    
initializeFromFilesystem(HoodieTableMetadataUtil.createAsyncIndexerTimestamp(indexUptoInstantTime),
 partitionTypes, Option.empty());
+    initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
   }
 
   /**
@@ -1085,8 +1083,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       // Even if we don't have any deleted files to sync, we still create an 
empty commit so that we can track the restore has completed.
       // We cannot create a deltaCommit at instantTime now because a future 
(rollback) block has already been written to the logFiles.
       // We need to choose a timestamp which would be a validInstantTime for 
MDT. This is either a commit timestamp completed on the dataset
-      // or a timestamp with suffix which we use for MDT clean, compaction etc.
-      String syncCommitTime = 
HoodieTableMetadataUtil.createRestoreTimestamp(writeClient.createNewInstantTime(false));
+      // or a new timestamp which we use for MDT clean, compaction etc.
+      String syncCommitTime = writeClient.createNewInstantTime(false);
       processAndCommit(syncCommitTime, () -> 
HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
           partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete, 
syncCommitTime));
       closeInternal();
@@ -1120,8 +1118,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       HoodieInstant deltaCommitInstant = new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime);
       if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) {
         LOG.info("Rolling back MDT deltacommit " + 
commitToRollbackInstantTime);
-        String rollbackInstantTime = createRollbackTimestamp(instantTime);
-        if (!getWriteClient().rollback(commitToRollbackInstantTime, 
rollbackInstantTime)) {
+        if (!getWriteClient().rollback(commitToRollbackInstantTime, 
instantTime)) {
           throw new HoodieMetadataException("Failed to rollback deltacommit at 
" + commitToRollbackInstantTime);
         }
       } else {
@@ -1318,12 +1315,11 @@ public abstract class 
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
         return;
       }
       // Check and run clean operations.
-      String latestDeltacommitTime = lastInstant.get()
-          .getTimestamp();
-      LOG.info("Latest deltacommit time found is " + latestDeltacommitTime + 
", running clean operations.");
-      cleanIfNecessary(writeClient, latestDeltacommitTime);
+      cleanIfNecessary(writeClient);
       // Do timeline validation before scheduling compaction/logCompaction 
operations.
       if (validateCompactionScheduling()) {
+        String latestDeltacommitTime = lastInstant.get().getTimestamp();
+        LOG.info("Latest deltacommit time found is " + latestDeltacommitTime + 
", running compaction operations.");
         compactIfNecessary(writeClient, latestDeltacommitTime);
       }
       writeClient.archive();
@@ -1377,10 +1373,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       LOG.info("Compaction is scheduled for timestamp " + 
compactionInstantTime);
       writeClient.compact(compactionInstantTime);
     } else if (metadataWriteConfig.isLogCompactionEnabled()) {
-      // Schedule and execute log compaction with suffixes based on the same 
instant time. This ensures that any future
-      // delta commits synced over will not have an instant time lesser than 
the last completed instant on the
-      // metadata table.
-      final String logCompactionInstantTime = 
HoodieTableMetadataUtil.createLogCompactionTimestamp(latestDeltacommitTime);
+      // Schedule and execute log compaction with new instant time.
+      final String logCompactionInstantTime = 
metadataMetaClient.createNewInstantTime(false);
       if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime))
 {
         LOG.info(String.format("Log compaction with same %s time is already 
present in the timeline.", logCompactionInstantTime));
       } else if 
(writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, 
Option.empty())) {
@@ -1390,7 +1384,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     }
   }
 
-  protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String 
instantTime) {
+  protected void cleanIfNecessary(BaseHoodieWriteClient writeClient) {
     Option<HoodieInstant> lastCompletedCompactionInstant = 
metadataMetaClient.reloadActiveTimeline()
         .getCommitTimeline().filterCompletedInstants().lastInstant();
     if (lastCompletedCompactionInstant.isPresent()
@@ -1407,7 +1401,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     // Trigger cleaning with suffixes based on the same instant time. This 
ensures that any future
     // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
     // metadata table.
-    
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
+    writeClient.clean(metadataMetaClient.createNewInstantTime(false));
     writeClient.lazyRollbackFailedIndexing();
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
index e7c44866b95..e64407bb2c4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -43,8 +43,9 @@ public interface HoodieTableMetadataWriter extends 
Serializable, AutoCloseable {
    *
    * @param engineContext
    * @param indexPartitionInfos - information about partitions to build such 
as partition type and base instant time
+   * @param instantTime The async index instant time from data table
    */
-  void buildMetadataPartitions(HoodieEngineContext engineContext, 
List<HoodieIndexPartitionInfo> indexPartitionInfos) throws IOException;
+  void buildMetadataPartitions(HoodieEngineContext engineContext, 
List<HoodieIndexPartitionInfo> indexPartitionInfos, String instantTime) throws 
IOException;
 
   /**
    * Drop the given metadata partitions.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 94c4296e470..e956c5e70a2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -149,7 +149,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
           String indexUptoInstant = 
indexPartitionInfos.get(0).getIndexUptoInstant();
           LOG.info("Starting Index Building with base instant: " + 
indexUptoInstant);
           HoodieTimer timer = HoodieTimer.start();
-          metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);
+          metadataWriter.buildMetadataPartitions(context, indexPartitionInfos, 
indexInstant.getTimestamp());
           metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
 
           // get remaining instants to catchup
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 94a127ae926..4b029a2576c 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -164,7 +164,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
 
     // reload timeline
     metadataMetaClient.reloadActiveTimeline();
-    
metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant().ifPresent(instant
 -> cleanIfNecessary(writeClient, instant.getTimestamp()));
+    cleanIfNecessary(writeClient);
     writeClient.archive();
 
     // Update total size of the metadata and count of base/log files
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index b67faddc5fb..6821dfbbc0d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -131,7 +131,6 @@ import static 
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMOR
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
-import static 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH;
 import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
@@ -159,32 +158,6 @@ public class HoodieTableMetadataUtil {
       DoubleWrapper.class, FloatWrapper.class, LongWrapper.class,
       StringWrapper.class, TimeMicrosWrapper.class, 
TimestampMicrosWrapper.class));
 
-  // Suffix to use for various operations on MDT
-  private enum OperationSuffix {
-    COMPACTION("001"),
-    CLEAN("002"),
-    RESTORE("003"),
-    METADATA_INDEXER("004"),
-    LOG_COMPACTION("005"),
-    ROLLBACK("006");
-
-    static final Set<String> ALL_SUFFIXES = 
Arrays.stream(OperationSuffix.values()).map(o -> 
o.getSuffix()).collect(Collectors.toSet());
-
-    private final String suffix;
-
-    OperationSuffix(String suffix) {
-      this.suffix = suffix;
-    }
-
-    String getSuffix() {
-      return suffix;
-    }
-
-    static boolean isValidSuffix(String suffix) {
-      return ALL_SUFFIXES.contains(suffix);
-    }
-  }
-
   // This suffix and all after that are used for initialization of the various 
partitions. The unused suffixes lower than this value
   // are reserved for future operations on the MDT.
   private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; // 
corresponds to "010";
@@ -1340,14 +1313,14 @@ public class HoodieTableMetadataUtil {
     // Only those log files which have a corresponding completed instant on 
the dataset should be read
     // This is because the metadata table is updated before the dataset 
instants are committed.
     HoodieActiveTimeline datasetTimeline = dataMetaClient.getActiveTimeline();
-    Set<String> validInstantTimestamps = 
datasetTimeline.filterCompletedInstants().getInstantsAsStream()
-        .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+    Set<String> datasetPendingInstants = 
datasetTimeline.filterInflightsAndRequested().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+    Set<String> validInstantTimestamps = 
datasetTimeline.filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
 
     // We should also add completed indexing delta commits in the metadata 
table, as they do not
     // have corresponding completed instant in the data table
     validInstantTimestamps.addAll(
         metadataMetaClient.getActiveTimeline()
-            .filter(instant -> instant.isCompleted() && 
isValidInstant(instant))
+            .filter(instant -> instant.isCompleted() && 
isValidInstant(datasetPendingInstants, instant))
             .getInstantsAsStream()
             .map(HoodieInstant::getTimestamp)
             .collect(Collectors.toList()));
@@ -1376,33 +1349,15 @@ public class HoodieTableMetadataUtil {
   /**
    * Checks if the Instant is a delta commit and has a valid suffix for 
operations on MDT.
    *
+   * @param datasetPendingInstants The dataset pending instants
    * @param instant {@code HoodieInstant} to check.
    * @return {@code true} if the instant is valid.
    */
-  public static boolean isValidInstant(HoodieInstant instant) {
-    // Should be a deltacommit
-    if (!instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
-      return false;
-    }
-
-    // Check correct length. The timestamp should have a suffix over the 
timeline's timestamp format.
-    final String instantTime = instant.getTimestamp();
-    if (!(instantTime.length() == MILLIS_INSTANT_ID_LENGTH + 
OperationSuffix.METADATA_INDEXER.getSuffix().length())) {
-      return false;
-    }
-
-    // Is this a fixed operations suffix
-    final String suffix = instantTime.substring(instantTime.length() - 3);
-    if (OperationSuffix.isValidSuffix(suffix)) {
-      return true;
-    }
-
-    // Is this a index init suffix?
-    if (suffix.compareTo(String.format("%03d", 
PARTITION_INITIALIZATION_TIME_SUFFIX)) >= 0) {
-      return true;
-    }
-
-    return false;
+  private static boolean isValidInstant(Set<String> datasetPendingInstants, 
HoodieInstant instant) {
+    // only includes a deltacommit,
+    // filter out any MDT instant that has pending corespondent dataset 
instant,
+    // this comes from a case that one instant fails to commit after MDT had 
been committed.
+    return instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION) && 
!datasetPendingInstants.contains(instant.getTimestamp());
   }
 
   /**
@@ -1411,12 +1366,19 @@ public class HoodieTableMetadataUtil {
    * TODO(HUDI-5733): This should be cleaned up once the proper fix of 
rollbacks in the
    *  metadata table is landed.
    *
-   * @param instantTime Instant time to check.
+   * @param dataIndexTimeline The instant timeline comprised with index 
commits from data table.
+   * @param instant The metadata table instant to check.
    * @return {@code true} if from async indexer; {@code false} otherwise.
    */
-  public static boolean isIndexingCommit(String instantTime) {
-    return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + 
OperationSuffix.METADATA_INDEXER.getSuffix().length()
-        && instantTime.endsWith(OperationSuffix.METADATA_INDEXER.getSuffix());
+  public static boolean isIndexingCommit(
+      HoodieTimeline dataIndexTimeline,
+      String instant) {
+    // A data table index commit was written as a delta commit on metadata 
table, use the data table
+    // timeline for auxiliary check.
+
+    // If this is a MDT, the pending delta commit on active timeline must also 
be active on the DT
+    // based on the fact that the MDT is committed before the DT.
+    return dataIndexTimeline.containsInstant(instant);
   }
 
   /**
@@ -1614,25 +1576,6 @@ public class HoodieTableMetadataUtil {
     return fileId.endsWith("-0") ? fileId.length() - 2 : fileId.length();
   }
 
-  /**
-   * Create the timestamp for a clean operation on the metadata table.
-   */
-  public static String createCleanTimestamp(String timestamp) {
-    return timestamp + OperationSuffix.CLEAN.getSuffix();
-  }
-
-  public static String createRollbackTimestamp(String timestamp) {
-    return timestamp + OperationSuffix.ROLLBACK.getSuffix();
-  }
-
-  public static String createRestoreTimestamp(String timestamp) {
-    return timestamp + OperationSuffix.RESTORE.getSuffix();
-  }
-
-  public static String createAsyncIndexerTimestamp(String timestamp) {
-    return timestamp + OperationSuffix.METADATA_INDEXER.getSuffix();
-  }
-
   /**
    * Create the timestamp for an index initialization operation on the 
metadata table.
    * <p>
@@ -1643,13 +1586,6 @@ public class HoodieTableMetadataUtil {
     return String.format("%s%03d", timestamp, 
PARTITION_INITIALIZATION_TIME_SUFFIX + offset);
   }
 
-  /**
-   * Create the timestamp for a compaction operation on the metadata table.
-   */
-  public static String createLogCompactionTimestamp(String timestamp) {
-    return timestamp + OperationSuffix.LOG_COMPACTION.getSuffix();
-  }
-
   /**
    * Estimates the file group count to use for a MDT partition.
    *
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 9a964dba520..09692222303 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -64,6 +64,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.startsWith;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -400,7 +401,8 @@ public class TestStreamWriteOperatorCoordinator {
     metadataTableMetaClient.reloadActiveTimeline();
     completedTimeline = 
metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
     assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(7));
-    assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), 
is(instant + "005"));
+    assertThat("The log compaction instant time should be new generated",
+        completedTimeline.nthFromLastInstant(1).get().getTimestamp(), 
not(instant));
     // log compaction is another delta commit
     assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), 
is(HoodieTimeline.DELTA_COMMIT_ACTION));
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index f637413b63d..96596c0fdc5 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -207,7 +207,7 @@ public class TestHoodieIndexer extends 
SparkClientFunctionalTestHarness implemen
     HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(
         context(), metadataConfig, metaClient.getBasePathV2().toString());
     HoodieTableMetaClient metadataMetaClient = 
metadata.getMetadataMetaClient();
-    String mdtCommitTime = 
HoodieTableMetadataUtil.createAsyncIndexerTimestamp(indexUptoInstantTime);
+    String mdtCommitTime = indexingInstant.getTimestamp();
     
assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(mdtCommitTime));
 
     // Reverts both instants to inflight state, to simulate inflight indexing 
instants

Reply via email to