This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8b8b2241d17 [HUDI-7552] Remove the suffix for MDT table service
instants (#10945)
8b8b2241d17 is described below
commit 8b8b2241d17f4d8b87253c006c79a2961d4668fe
Author: Danny Chan <[email protected]>
AuthorDate: Thu Apr 11 09:22:42 2024 +0800
[HUDI-7552] Remove the suffix for MDT table service instants (#10945)
---
.../hudi/client/BaseHoodieTableServiceClient.java | 24 ++++-
.../metadata/HoodieBackedTableMetadataWriter.java | 36 +++----
.../hudi/metadata/HoodieTableMetadataWriter.java | 3 +-
.../table/action/index/RunIndexActionExecutor.java | 2 +-
.../FlinkHoodieBackedTableMetadataWriter.java | 2 +-
.../hudi/metadata/HoodieTableMetadataUtil.java | 104 ++++-----------------
.../sink/TestStreamWriteOperatorCoordinator.java | 4 +-
.../apache/hudi/utilities/TestHoodieIndexer.java | 2 +-
8 files changed, 62 insertions(+), 115 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index c472be33f3d..143da3e6084 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -57,6 +57,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieLogCompactException;
import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -885,7 +886,6 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
/**
* Rolls back the failed delta commits corresponding to the indexing action.
- * Such delta commits are identified based on the suffix
`METADATA_INDEXER_TIME_SUFFIX` ("004").
* <p>
* TODO(HUDI-5733): This should be cleaned up once the proper fix of
rollbacks
* in the metadata table is landed.
@@ -894,17 +894,26 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
*/
protected boolean rollbackFailedIndexingCommits() {
HoodieTable table = createTable(config, hadoopConf);
- List<String> instantsToRollback =
getFailedIndexingCommitsToRollback(table.getMetaClient());
+ List<String> instantsToRollback =
getFailedIndexingCommitsToRollbackForMetadataTable(table.getMetaClient());
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks =
getPendingRollbackInfos(table.getMetaClient());
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry,
Option.empty()));
rollbackFailedWrites(pendingRollbacks);
return !pendingRollbacks.isEmpty();
}
- protected List<String>
getFailedIndexingCommitsToRollback(HoodieTableMetaClient metaClient) {
+ private List<String>
getFailedIndexingCommitsToRollbackForMetadataTable(HoodieTableMetaClient
metaClient) {
+ if (!isMetadataTable(metaClient.getBasePathV2().toString())) {
+ return Collections.emptyList();
+ }
+ HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
+
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
+ .setConf(metaClient.getHadoopConf())
+ .build();
+ HoodieTimeline dataIndexTimeline =
dataMetaClient.getActiveTimeline().filter(instant ->
instant.getAction().equals(HoodieTimeline.INDEXING_ACTION));
+
Stream<HoodieInstant> inflightInstantsStream =
metaClient.getCommitsTimeline()
.filter(instant -> !instant.isCompleted()
- && isIndexingCommit(instant.getTimestamp()))
+ && isIndexingCommit(dataIndexTimeline, instant.getTimestamp()))
.getInstantsAsStream();
return inflightInstantsStream.filter(instant -> {
try {
@@ -963,11 +972,16 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
// TODO(HUDI-5733): This should be cleaned up once the proper fix of
rollbacks in the
// metadata table is landed.
if (metaClient.isMetadataTable()) {
+ HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
+
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
+ .setConf(metaClient.getHadoopConf())
+ .build();
+ HoodieTimeline dataIndexTimeline =
dataMetaClient.getActiveTimeline().filter(instant ->
instant.getAction().equals(HoodieTimeline.INDEXING_ACTION));
return
inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
if (curInstantTime.isPresent()) {
return !entry.equals(curInstantTime.get());
} else {
- return !isIndexingCommit(entry);
+ return !isIndexingCommit(dataIndexTimeline, entry);
}
}).collect(Collectors.toList());
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index ca9c72ac601..891cc88b9da 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -106,7 +106,6 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInfli
import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan;
import static
org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.createRollbackTimestamp;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForFunctionalIndex;
@@ -128,8 +127,6 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
private static final Logger LOG =
LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class);
- public static final String METADATA_COMPACTION_TIME_SUFFIX = "001";
-
// Virtual keys support for metadata table. This Field is
// from the metadata payload schema.
private static final String RECORD_KEY_FIELD_NAME =
HoodieMetadataPayload.KEY_FIELD_NAME;
@@ -487,8 +484,9 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* @return a unique timestamp for MDT
*/
private String generateUniqueCommitInstantTime(String initializationTime) {
- // if its initialized via Async indexer, we don't need to alter the init
time
- if (HoodieTableMetadataUtil.isIndexingCommit(initializationTime)) {
+ // if it's initialized via Async indexer, we don't need to alter the init
time
+ HoodieTimeline dataIndexTimeline =
dataMetaClient.getActiveTimeline().filter(instant ->
instant.getAction().equals(HoodieTimeline.INDEXING_ACTION));
+ if (HoodieTableMetadataUtil.isIndexingCommit(dataIndexTimeline,
initializationTime)) {
return initializationTime;
}
// Add suffix to initializationTime to find an unused instant time for the
next index initialization.
@@ -905,7 +903,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
return
getEnabledPartitionTypes().stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
}
- public void buildMetadataPartitions(HoodieEngineContext engineContext,
List<HoodieIndexPartitionInfo> indexPartitionInfos) throws IOException {
+ public void buildMetadataPartitions(HoodieEngineContext engineContext,
List<HoodieIndexPartitionInfo> indexPartitionInfos, String instantTime) throws
IOException {
if (indexPartitionInfos.isEmpty()) {
LOG.warn("No partition to index in the plan");
return;
@@ -932,7 +930,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
dataMetaClient.getTableConfig().setMetadataPartitionsInflight(dataMetaClient,
partitionPaths);
// initialize partitions
-
initializeFromFilesystem(HoodieTableMetadataUtil.createAsyncIndexerTimestamp(indexUptoInstantTime),
partitionTypes, Option.empty());
+ initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
}
/**
@@ -1085,8 +1083,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
// Even if we don't have any deleted files to sync, we still create an
empty commit so that we can track the restore has completed.
// We cannot create a deltaCommit at instantTime now because a future
(rollback) block has already been written to the logFiles.
// We need to choose a timestamp which would be a validInstantTime for
MDT. This is either a commit timestamp completed on the dataset
- // or a timestamp with suffix which we use for MDT clean, compaction etc.
- String syncCommitTime =
HoodieTableMetadataUtil.createRestoreTimestamp(writeClient.createNewInstantTime(false));
+ // or a new timestamp which we use for MDT clean, compaction etc.
+ String syncCommitTime = writeClient.createNewInstantTime(false);
processAndCommit(syncCommitTime, () ->
HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete,
syncCommitTime));
closeInternal();
@@ -1120,8 +1118,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
HoodieInstant deltaCommitInstant = new HoodieInstant(false,
HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime);
if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) {
LOG.info("Rolling back MDT deltacommit " +
commitToRollbackInstantTime);
- String rollbackInstantTime = createRollbackTimestamp(instantTime);
- if (!getWriteClient().rollback(commitToRollbackInstantTime,
rollbackInstantTime)) {
+ if (!getWriteClient().rollback(commitToRollbackInstantTime,
instantTime)) {
throw new HoodieMetadataException("Failed to rollback deltacommit at
" + commitToRollbackInstantTime);
}
} else {
@@ -1318,12 +1315,11 @@ public abstract class
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
return;
}
// Check and run clean operations.
- String latestDeltacommitTime = lastInstant.get()
- .getTimestamp();
- LOG.info("Latest deltacommit time found is " + latestDeltacommitTime +
", running clean operations.");
- cleanIfNecessary(writeClient, latestDeltacommitTime);
+ cleanIfNecessary(writeClient);
// Do timeline validation before scheduling compaction/logCompaction
operations.
if (validateCompactionScheduling()) {
+ String latestDeltacommitTime = lastInstant.get().getTimestamp();
+ LOG.info("Latest deltacommit time found is " + latestDeltacommitTime +
", running compaction operations.");
compactIfNecessary(writeClient, latestDeltacommitTime);
}
writeClient.archive();
@@ -1377,10 +1373,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
LOG.info("Compaction is scheduled for timestamp " +
compactionInstantTime);
writeClient.compact(compactionInstantTime);
} else if (metadataWriteConfig.isLogCompactionEnabled()) {
- // Schedule and execute log compaction with suffixes based on the same
instant time. This ensures that any future
- // delta commits synced over will not have an instant time lesser than
the last completed instant on the
- // metadata table.
- final String logCompactionInstantTime =
HoodieTableMetadataUtil.createLogCompactionTimestamp(latestDeltacommitTime);
+ // Schedule and execute log compaction with new instant time.
+ final String logCompactionInstantTime =
metadataMetaClient.createNewInstantTime(false);
if
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime))
{
LOG.info(String.format("Log compaction with same %s time is already
present in the timeline.", logCompactionInstantTime));
} else if
(writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime,
Option.empty())) {
@@ -1390,7 +1384,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
}
}
- protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String
instantTime) {
+ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient) {
Option<HoodieInstant> lastCompletedCompactionInstant =
metadataMetaClient.reloadActiveTimeline()
.getCommitTimeline().filterCompletedInstants().lastInstant();
if (lastCompletedCompactionInstant.isPresent()
@@ -1407,7 +1401,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
// Trigger cleaning with suffixes based on the same instant time. This
ensures that any future
// delta commits synced over will not have an instant time lesser than the
last completed instant on the
// metadata table.
-
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
+ writeClient.clean(metadataMetaClient.createNewInstantTime(false));
writeClient.lazyRollbackFailedIndexing();
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
index e7c44866b95..e64407bb2c4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -43,8 +43,9 @@ public interface HoodieTableMetadataWriter extends
Serializable, AutoCloseable {
*
* @param engineContext
* @param indexPartitionInfos - information about partitions to build such
as partition type and base instant time
+ * @param instantTime The async index instant time from data table
*/
- void buildMetadataPartitions(HoodieEngineContext engineContext,
List<HoodieIndexPartitionInfo> indexPartitionInfos) throws IOException;
+ void buildMetadataPartitions(HoodieEngineContext engineContext,
List<HoodieIndexPartitionInfo> indexPartitionInfos, String instantTime) throws
IOException;
/**
* Drop the given metadata partitions.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 94c4296e470..e956c5e70a2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -149,7 +149,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I,
String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
LOG.info("Starting Index Building with base instant: " +
indexUptoInstant);
HoodieTimer timer = HoodieTimer.start();
- metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);
+ metadataWriter.buildMetadataPartitions(context, indexPartitionInfos,
indexInstant.getTimestamp());
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
// get remaining instants to catchup
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 94a127ae926..4b029a2576c 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -164,7 +164,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
// reload timeline
metadataMetaClient.reloadActiveTimeline();
-
metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant().ifPresent(instant
-> cleanIfNecessary(writeClient, instant.getTimestamp()));
+ cleanIfNecessary(writeClient);
writeClient.archive();
// Update total size of the metadata and count of base/log files
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index b67faddc5fb..6821dfbbc0d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -131,7 +131,6 @@ import static
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMOR
import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
import static
org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION;
import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
-import static
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH;
import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
@@ -159,32 +158,6 @@ public class HoodieTableMetadataUtil {
DoubleWrapper.class, FloatWrapper.class, LongWrapper.class,
StringWrapper.class, TimeMicrosWrapper.class,
TimestampMicrosWrapper.class));
- // Suffix to use for various operations on MDT
- private enum OperationSuffix {
- COMPACTION("001"),
- CLEAN("002"),
- RESTORE("003"),
- METADATA_INDEXER("004"),
- LOG_COMPACTION("005"),
- ROLLBACK("006");
-
- static final Set<String> ALL_SUFFIXES =
Arrays.stream(OperationSuffix.values()).map(o ->
o.getSuffix()).collect(Collectors.toSet());
-
- private final String suffix;
-
- OperationSuffix(String suffix) {
- this.suffix = suffix;
- }
-
- String getSuffix() {
- return suffix;
- }
-
- static boolean isValidSuffix(String suffix) {
- return ALL_SUFFIXES.contains(suffix);
- }
- }
-
// This suffix and all after that are used for initialization of the various
partitions. The unused suffixes lower than this value
// are reserved for future operations on the MDT.
private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; //
corresponds to "010";
@@ -1340,14 +1313,14 @@ public class HoodieTableMetadataUtil {
// Only those log files which have a corresponding completed instant on
the dataset should be read
// This is because the metadata table is updated before the dataset
instants are committed.
HoodieActiveTimeline datasetTimeline = dataMetaClient.getActiveTimeline();
- Set<String> validInstantTimestamps =
datasetTimeline.filterCompletedInstants().getInstantsAsStream()
- .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+ Set<String> datasetPendingInstants =
datasetTimeline.filterInflightsAndRequested().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+ Set<String> validInstantTimestamps =
datasetTimeline.filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
// We should also add completed indexing delta commits in the metadata
table, as they do not
// have corresponding completed instant in the data table
validInstantTimestamps.addAll(
metadataMetaClient.getActiveTimeline()
- .filter(instant -> instant.isCompleted() &&
isValidInstant(instant))
+ .filter(instant -> instant.isCompleted() &&
isValidInstant(datasetPendingInstants, instant))
.getInstantsAsStream()
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList()));
@@ -1376,33 +1349,15 @@ public class HoodieTableMetadataUtil {
/**
* Checks if the Instant is a delta commit and has a valid suffix for
operations on MDT.
*
+ * @param datasetPendingInstants The dataset pending instants
* @param instant {@code HoodieInstant} to check.
* @return {@code true} if the instant is valid.
*/
- public static boolean isValidInstant(HoodieInstant instant) {
- // Should be a deltacommit
- if (!instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
- return false;
- }
-
- // Check correct length. The timestamp should have a suffix over the
timeline's timestamp format.
- final String instantTime = instant.getTimestamp();
- if (!(instantTime.length() == MILLIS_INSTANT_ID_LENGTH +
OperationSuffix.METADATA_INDEXER.getSuffix().length())) {
- return false;
- }
-
- // Is this a fixed operations suffix
- final String suffix = instantTime.substring(instantTime.length() - 3);
- if (OperationSuffix.isValidSuffix(suffix)) {
- return true;
- }
-
- // Is this a index init suffix?
- if (suffix.compareTo(String.format("%03d",
PARTITION_INITIALIZATION_TIME_SUFFIX)) >= 0) {
- return true;
- }
-
- return false;
+ private static boolean isValidInstant(Set<String> datasetPendingInstants,
HoodieInstant instant) {
+ // only includes a deltacommit,
+ // filter out any MDT instant that has pending corespondent dataset
instant,
+ // this comes from a case that one instant fails to commit after MDT had
been committed.
+ return instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION) &&
!datasetPendingInstants.contains(instant.getTimestamp());
}
/**
@@ -1411,12 +1366,19 @@ public class HoodieTableMetadataUtil {
* TODO(HUDI-5733): This should be cleaned up once the proper fix of
rollbacks in the
* metadata table is landed.
*
- * @param instantTime Instant time to check.
+ * @param dataIndexTimeline The instant timeline comprised with index
commits from data table.
+ * @param instant The metadata table instant to check.
* @return {@code true} if from async indexer; {@code false} otherwise.
*/
- public static boolean isIndexingCommit(String instantTime) {
- return instantTime.length() == MILLIS_INSTANT_ID_LENGTH +
OperationSuffix.METADATA_INDEXER.getSuffix().length()
- && instantTime.endsWith(OperationSuffix.METADATA_INDEXER.getSuffix());
+ public static boolean isIndexingCommit(
+ HoodieTimeline dataIndexTimeline,
+ String instant) {
+ // A data table index commit was written as a delta commit on metadata
table, use the data table
+ // timeline for auxiliary check.
+
+ // If this is a MDT, the pending delta commit on active timeline must also
be active on the DT
+ // based on the fact that the MDT is committed before the DT.
+ return dataIndexTimeline.containsInstant(instant);
}
/**
@@ -1614,25 +1576,6 @@ public class HoodieTableMetadataUtil {
return fileId.endsWith("-0") ? fileId.length() - 2 : fileId.length();
}
- /**
- * Create the timestamp for a clean operation on the metadata table.
- */
- public static String createCleanTimestamp(String timestamp) {
- return timestamp + OperationSuffix.CLEAN.getSuffix();
- }
-
- public static String createRollbackTimestamp(String timestamp) {
- return timestamp + OperationSuffix.ROLLBACK.getSuffix();
- }
-
- public static String createRestoreTimestamp(String timestamp) {
- return timestamp + OperationSuffix.RESTORE.getSuffix();
- }
-
- public static String createAsyncIndexerTimestamp(String timestamp) {
- return timestamp + OperationSuffix.METADATA_INDEXER.getSuffix();
- }
-
/**
* Create the timestamp for an index initialization operation on the
metadata table.
* <p>
@@ -1643,13 +1586,6 @@ public class HoodieTableMetadataUtil {
return String.format("%s%03d", timestamp,
PARTITION_INITIALIZATION_TIME_SUFFIX + offset);
}
- /**
- * Create the timestamp for a compaction operation on the metadata table.
- */
- public static String createLogCompactionTimestamp(String timestamp) {
- return timestamp + OperationSuffix.LOG_COMPACTION.getSuffix();
- }
-
/**
* Estimates the file group count to use for a MDT partition.
*
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 9a964dba520..09692222303 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -64,6 +64,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -400,7 +401,8 @@ public class TestStreamWriteOperatorCoordinator {
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline =
metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(7));
- assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(),
is(instant + "005"));
+ assertThat("The log compaction instant time should be new generated",
+ completedTimeline.nthFromLastInstant(1).get().getTimestamp(),
not(instant));
// log compaction is another delta commit
assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(),
is(HoodieTimeline.DELTA_COMMIT_ACTION));
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index f637413b63d..96596c0fdc5 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -207,7 +207,7 @@ public class TestHoodieIndexer extends
SparkClientFunctionalTestHarness implemen
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(
context(), metadataConfig, metaClient.getBasePathV2().toString());
HoodieTableMetaClient metadataMetaClient =
metadata.getMetadataMetaClient();
- String mdtCommitTime =
HoodieTableMetadataUtil.createAsyncIndexerTimestamp(indexUptoInstantTime);
+ String mdtCommitTime = indexingInstant.getTimestamp();
assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(mdtCommitTime));
// Reverts both instants to inflight state, to simulate inflight indexing
instants