This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 89b8ae02bf49afe412b7472b22ad4ffaef116a06
Author: Y Ethan Guo <ethan.guoyi...@gmail.com>
AuthorDate: Thu Aug 10 19:17:07 2023 -0700

    [HUDI-6679] Fix initialization of metadata table partitions upon failure 
(#9419)
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |   8 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |   7 +-
 .../functional/TestHoodieBackedMetadata.java       | 123 ++++++++++++++++++++-
 3 files changed, 128 insertions(+), 10 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index e55fb045e1e..7e78bddd875 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -57,7 +57,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieLogCompactException;
 import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -88,6 +87,7 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.metadata.HoodieTableMetadata.isMetadataTable;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit;
 
 /**
@@ -932,8 +932,10 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
     LinkedHashMap<String, Option<HoodiePendingRollbackInfo>> 
reverseSortedRollbackInstants = instantsToRollback.entrySet()
         .stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey()))
         .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, 
e2) -> e1, LinkedHashMap::new));
+    boolean isMetadataTable = isMetadataTable(basePath);
     for (Map.Entry<String, Option<HoodiePendingRollbackInfo>> entry : 
reverseSortedRollbackInstants.entrySet()) {
-      if (HoodieTimeline.compareTimestamps(entry.getKey(), 
HoodieTimeline.LESSER_THAN_OR_EQUALS,
+      if (!isMetadataTable
+          && HoodieTimeline.compareTimestamps(entry.getKey(), 
HoodieTimeline.LESSER_THAN_OR_EQUALS,
           HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
         // do we need to handle failed rollback of a bootstrap
         rollbackFailedBootstrap();
@@ -954,7 +956,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
       // from the async indexer (`HoodieIndexer`).
       // TODO(HUDI-5733): This should be cleaned up once the proper fix of 
rollbacks in the
       //  metadata table is landed.
-      if 
(HoodieTableMetadata.isMetadataTable(metaClient.getBasePathV2().toString())) {
+      if (isMetadataTable(metaClient.getBasePathV2().toString())) {
         return 
inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
           if (curInstantTime.isPresent()) {
             return !entry.equals(curInstantTime.get());
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 4f965e587cb..74d8ae16176 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -112,7 +112,6 @@ import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deseri
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.createRollbackTimestamp;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
 
 /**
@@ -257,10 +256,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       // check if any of the enabled partition types needs to be initialized
       // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
       if (!dataWriteConfig.isMetadataAsyncIndex()) {
-        Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
-        LOG.info("Async metadata indexing disabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
+        Set<String> completedPartitions = 
dataMetaClient.getTableConfig().getMetadataPartitions();
+        LOG.info("Async metadata indexing disabled and following partitions 
already initialized: " + completedPartitions);
         this.enabledPartitionTypes.stream()
-            .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
+            .filter(p -> !completedPartitions.contains(p.getPartitionPath()) 
&& !MetadataPartitionType.FILES.equals(p))
             .forEach(partitionsToInit::add);
       }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index d33cada74b6..464d47b2a27 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -75,6 +75,7 @@ import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.hash.ColumnIndexID;
 import org.apache.hudi.common.util.hash.PartitionIndexID;
 import org.apache.hudi.config.HoodieArchivalConfig;
@@ -110,8 +111,10 @@ import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.util.Time;
@@ -160,10 +163,15 @@ import static 
org.apache.hudi.common.model.WriteOperationType.DELETE;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
 import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_EXTENSION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_EXTENSION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.INFLIGHT_EXTENSION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REQUESTED_EXTENSION;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getNextCommitTime;
 import static 
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS;
 import static 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.METADATA_COMPACTION_TIME_SUFFIX;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
 import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
@@ -870,7 +878,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     // Fetch compaction Commit file and rename to some other file. completed 
compaction meta file should have some serialized info that table interprets
     // for future upserts. so, renaming the file here to some temp name and 
later renaming it back to same name.
     java.nio.file.Path parentPath = Paths.get(metadataTableBasePath, 
METAFOLDER_NAME);
-    java.nio.file.Path metaFilePath = 
parentPath.resolve(metadataCompactionInstant + HoodieTimeline.COMMIT_EXTENSION);
+    java.nio.file.Path metaFilePath = 
parentPath.resolve(metadataCompactionInstant + COMMIT_EXTENSION);
     java.nio.file.Path tempFilePath = 
FileCreateUtils.renameFileToTemp(metaFilePath, metadataCompactionInstant);
     metaClient.reloadActiveTimeline();
     testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, 
Option.of(context));
@@ -903,7 +911,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       // Fetch compaction Commit file and rename to some other file. completed 
compaction meta file should have some serialized info that table interprets
       // for future upserts. so, renaming the file here to some temp name and 
later renaming it back to same name.
       parentPath = Paths.get(metadataTableBasePath, METAFOLDER_NAME);
-      metaFilePath = parentPath.resolve(metadataCompactionInstant + 
HoodieTimeline.COMMIT_EXTENSION);
+      metaFilePath = parentPath.resolve(metadataCompactionInstant + 
COMMIT_EXTENSION);
       tempFilePath = FileCreateUtils.renameFileToTemp(metaFilePath, 
metadataCompactionInstant);
 
       validateMetadata(testTable);
@@ -978,6 +986,115 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
   }
 
+  @Test
+  public void testMetadataRollbackDuringInit() throws Exception {
+    HoodieTableType tableType = COPY_ON_WRITE;
+    init(tableType, false);
+    writeConfig = getWriteConfigBuilder(false, true, false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordIndex(true)
+            .build())
+        .build();
+
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // First write that will be rolled back
+    String newCommitTime1 = "20230809230000000";
+    List<HoodieRecord> records1 = dataGen.generateInserts(newCommitTime1, 100);
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      client.startCommitWithTime(newCommitTime1);
+      JavaRDD writeStatuses = client.insert(jsc.parallelize(records1, 1), 
newCommitTime1);
+      client.commit(newCommitTime1, writeStatuses);
+    }
+
+    // Revert the first commit to inflight, and move the table to a state 
where MDT fails
+    // during the initialization of the second partition (record_index)
+    revertTableToInflightState(writeConfig);
+
+    // Second write
+    String newCommitTime2 = "20230809232000000";
+    List<HoodieRecord> records2 = dataGen.generateInserts(newCommitTime2, 20);
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      client.startCommitWithTime(newCommitTime2);
+      JavaRDD writeStatuses = client.insert(jsc.parallelize(records2, 1), 
newCommitTime2);
+      client.commit(newCommitTime2, writeStatuses);
+    }
+
+    HoodieTableMetadata metadataReader = HoodieTableMetadata.create(
+        context, writeConfig.getMetadataConfig(), writeConfig.getBasePath());
+    Map<String, HoodieRecordGlobalLocation> result = metadataReader
+        
.readRecordIndex(records1.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
+    assertEquals(0, result.size(), "RI should not return entries that are 
rolled back.");
+    result = metadataReader
+        
.readRecordIndex(records2.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
+    assertEquals(records2.size(), result.size(), "RI should return entries in 
the commit.");
+  }
+
+  private void revertTableToInflightState(HoodieWriteConfig writeConfig) 
throws IOException {
+    String basePath = writeConfig.getBasePath();
+    String mdtBasePath = getMetadataTableBasePath(basePath);
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(new Configuration())
+        .setBasePath(basePath)
+        .build();
+    HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
+        .setConf(new Configuration())
+        .setBasePath(mdtBasePath)
+        .build();
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    HoodieActiveTimeline mdtTimeline = mdtMetaClient.getActiveTimeline();
+    assertEquals(1, timeline.countInstants());
+    assertEquals(1, 
timeline.getCommitsTimeline().filterCompletedInstants().countInstants());
+    assertEquals(3, mdtTimeline.countInstants());
+    assertEquals(3, 
mdtTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
+    String mdtInitCommit2 = 
HoodieTableMetadataUtil.createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, 1);
+    Pair<HoodieInstant, HoodieCommitMetadata> lastCommitMetadataWithValidData =
+        mdtTimeline.getLastCommitMetadataWithValidData().get();
+    String commit = lastCommitMetadataWithValidData.getLeft().getTimestamp();
+    assertTrue(timeline.getCommitsTimeline().containsInstant(commit));
+    assertTrue(mdtTimeline.getCommitsTimeline().containsInstant(commit));
+
+    // Transition the last commit to inflight in DT
+    deleteMetaFile(metaClient.getFs(), basePath, commit, COMMIT_EXTENSION);
+
+    // Remove the last commit and written data files in MDT
+    List<String> dataFiles = 
lastCommitMetadataWithValidData.getRight().getWriteStats().stream().map(
+        HoodieWriteStat::getPath).collect(Collectors.toList());
+
+    for (String relativeFilePath : dataFiles) {
+      deleteFileFromDfs(metaClient.getFs(), mdtBasePath + "/" + 
relativeFilePath);
+    }
+
+    deleteMetaFile(metaClient.getFs(), mdtBasePath, commit, 
DELTA_COMMIT_EXTENSION);
+    deleteMetaFile(metaClient.getFs(), mdtBasePath, commit, 
DELTA_COMMIT_EXTENSION + INFLIGHT_EXTENSION);
+    deleteMetaFile(metaClient.getFs(), mdtBasePath, commit, 
DELTA_COMMIT_EXTENSION + REQUESTED_EXTENSION);
+
+    // Transition the second init commit for record_index partition to 
inflight in MDT
+    deleteMetaFile(metaClient.getFs(), mdtBasePath, mdtInitCommit2, 
DELTA_COMMIT_EXTENSION);
+    metaClient.getTableConfig().setMetadataPartitionState(
+        metaClient, MetadataPartitionType.RECORD_INDEX, false);
+    metaClient.getTableConfig().setMetadataPartitionsInflight(
+        metaClient, MetadataPartitionType.RECORD_INDEX);
+    timeline = metaClient.getActiveTimeline().reload();
+    mdtTimeline = mdtMetaClient.getActiveTimeline().reload();
+    assertEquals(commit, timeline.lastInstant().get().getTimestamp());
+    assertTrue(timeline.lastInstant().get().isInflight());
+    assertEquals(mdtInitCommit2, 
mdtTimeline.lastInstant().get().getTimestamp());
+    assertTrue(mdtTimeline.lastInstant().get().isInflight());
+  }
+
+  public static void deleteFileFromDfs(FileSystem fs, String targetPath) 
throws IOException {
+    if (fs.exists(new Path(targetPath))) {
+      fs.delete(new Path(targetPath), true);
+    }
+  }
+
+  public static void deleteMetaFile(FileSystem fs, String basePath, String 
instantTime, String suffix) throws IOException {
+    String targetPath = basePath + "/" + METAFOLDER_NAME + "/" + instantTime + 
suffix;
+    deleteFileFromDfs(fs, targetPath);
+  }
+
   /**
    * Test arguments - Table type, populate meta fields, exclude key from 
payload.
    */
@@ -2163,7 +2280,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
       // make all commits to inflight in metadata table. Still read should go 
through, just that it may not return any data.
       FileCreateUtils.deleteDeltaCommit(basePath + "/.hoodie/metadata/", 
commitTimestamps[0]);
-      FileCreateUtils.deleteDeltaCommit(basePath + " /.hoodie/metadata/", 
HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP);
+      FileCreateUtils.deleteDeltaCommit(basePath + " /.hoodie/metadata/", 
SOLO_COMMIT_TIMESTAMP);
       assertEquals(getAllFiles(metadata(client)).stream().map(p -> 
p.getName()).map(n -> 
FSUtils.getCommitTime(n)).collect(Collectors.toSet()).size(), 0);
     }
   }

Reply via email to