This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 89b8ae02bf49afe412b7472b22ad4ffaef116a06 Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Thu Aug 10 19:17:07 2023 -0700 [HUDI-6679] Fix initialization of metadata table partitions upon failure (#9419) --- .../hudi/client/BaseHoodieTableServiceClient.java | 8 +- .../metadata/HoodieBackedTableMetadataWriter.java | 7 +- .../functional/TestHoodieBackedMetadata.java | 123 ++++++++++++++++++++- 3 files changed, 128 insertions(+), 10 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index e55fb045e1e..7e78bddd875 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -57,7 +57,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieLogCompactException; import org.apache.hudi.exception.HoodieRollbackException; -import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -88,6 +87,7 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; +import static org.apache.hudi.metadata.HoodieTableMetadata.isMetadataTable; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit; /** @@ -932,8 +932,10 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl LinkedHashMap<String, Option<HoodiePendingRollbackInfo>> reverseSortedRollbackInstants = instantsToRollback.entrySet() .stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new)); + boolean isMetadataTable = isMetadataTable(basePath); for (Map.Entry<String, Option<HoodiePendingRollbackInfo>> entry : reverseSortedRollbackInstants.entrySet()) { - if (HoodieTimeline.compareTimestamps(entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS, + if (!isMetadataTable + && HoodieTimeline.compareTimestamps(entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) { // do we need to handle failed rollback of a bootstrap rollbackFailedBootstrap(); @@ -954,7 +956,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl // from the async indexer (`HoodieIndexer`). // TODO(HUDI-5733): This should be cleaned up once the proper fix of rollbacks in the // metadata table is landed. - if (HoodieTableMetadata.isMetadataTable(metaClient.getBasePathV2().toString())) { + if (isMetadataTable(metaClient.getBasePathV2().toString())) { return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> { if (curInstantTime.isPresent()) { return !entry.equals(curInstantTime.get()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 4f965e587cb..74d8ae16176 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -112,7 +112,6 @@ import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deseri import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.createRollbackTimestamp; -import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions; /** @@ -257,10 +256,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // check if any of the enabled partition types needs to be initialized // NOTE: It needs to be guarded by async index config because if that is enabled then initialization happens through the index scheduler. if (!dataWriteConfig.isMetadataAsyncIndex()) { - Set<String> inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig()); - LOG.info("Async metadata indexing disabled and following partitions already initialized: " + inflightAndCompletedPartitions); + Set<String> completedPartitions = dataMetaClient.getTableConfig().getMetadataPartitions(); + LOG.info("Async metadata indexing disabled and following partitions already initialized: " + completedPartitions); this.enabledPartitionTypes.stream() - .filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)) + .filter(p -> !completedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)) .forEach(partitionsToInit::add); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index d33cada74b6..464d47b2a27 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -75,6 +75,7 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.hash.ColumnIndexID; import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.config.HoodieArchivalConfig; @@ -110,8 +111,10 @@ import org.apache.hudi.testutils.MetadataMergeWriteStatus; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.util.Time; @@ -160,10 +163,15 @@ import static org.apache.hudi.common.model.WriteOperationType.DELETE; import static org.apache.hudi.common.model.WriteOperationType.INSERT; import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_EXTENSION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_EXTENSION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.INFLIGHT_EXTENSION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REQUESTED_EXTENSION; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.getNextCommitTime; import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS; import static org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.METADATA_COMPACTION_TIME_SUFFIX; +import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable; import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS; @@ -870,7 +878,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { // Fetch compaction Commit file and rename to some other file. completed compaction meta file should have some serialized info that table interprets // for future upserts. so, renaming the file here to some temp name and later renaming it back to same name. java.nio.file.Path parentPath = Paths.get(metadataTableBasePath, METAFOLDER_NAME); - java.nio.file.Path metaFilePath = parentPath.resolve(metadataCompactionInstant + HoodieTimeline.COMMIT_EXTENSION); + java.nio.file.Path metaFilePath = parentPath.resolve(metadataCompactionInstant + COMMIT_EXTENSION); java.nio.file.Path tempFilePath = FileCreateUtils.renameFileToTemp(metaFilePath, metadataCompactionInstant); metaClient.reloadActiveTimeline(); testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context)); @@ -903,7 +911,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { // Fetch compaction Commit file and rename to some other file. completed compaction meta file should have some serialized info that table interprets // for future upserts. so, renaming the file here to some temp name and later renaming it back to same name. parentPath = Paths.get(metadataTableBasePath, METAFOLDER_NAME); - metaFilePath = parentPath.resolve(metadataCompactionInstant + HoodieTimeline.COMMIT_EXTENSION); + metaFilePath = parentPath.resolve(metadataCompactionInstant + COMMIT_EXTENSION); tempFilePath = FileCreateUtils.renameFileToTemp(metaFilePath, metadataCompactionInstant); validateMetadata(testTable); @@ -978,6 +986,115 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { } } + @Test + public void testMetadataRollbackDuringInit() throws Exception { + HoodieTableType tableType = COPY_ON_WRITE; + init(tableType, false); + writeConfig = getWriteConfigBuilder(false, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .withEnableRecordIndex(true) + .build()) + .build(); + + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + // First write that will be rolled back + String newCommitTime1 = "20230809230000000"; + List<HoodieRecord> records1 = dataGen.generateInserts(newCommitTime1, 100); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + client.startCommitWithTime(newCommitTime1); + JavaRDD writeStatuses = client.insert(jsc.parallelize(records1, 1), newCommitTime1); + client.commit(newCommitTime1, writeStatuses); + } + + // Revert the first commit to inflight, and move the table to a state where MDT fails + // during the initialization of the second partition (record_index) + revertTableToInflightState(writeConfig); + + // Second write + String newCommitTime2 = "20230809232000000"; + List<HoodieRecord> records2 = dataGen.generateInserts(newCommitTime2, 20); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + client.startCommitWithTime(newCommitTime2); + JavaRDD writeStatuses = client.insert(jsc.parallelize(records2, 1), newCommitTime2); + client.commit(newCommitTime2, writeStatuses); + } + + HoodieTableMetadata metadataReader = HoodieTableMetadata.create( + context, writeConfig.getMetadataConfig(), writeConfig.getBasePath()); + Map<String, HoodieRecordGlobalLocation> result = metadataReader + .readRecordIndex(records1.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList())); + assertEquals(0, result.size(), "RI should not return entries that are rolled back."); + result = metadataReader + .readRecordIndex(records2.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList())); + assertEquals(records2.size(), result.size(), "RI should return entries in the commit."); + } + + private void revertTableToInflightState(HoodieWriteConfig writeConfig) throws IOException { + String basePath = writeConfig.getBasePath(); + String mdtBasePath = getMetadataTableBasePath(basePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(new Configuration()) + .setBasePath(basePath) + .build(); + HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder() + .setConf(new Configuration()) + .setBasePath(mdtBasePath) + .build(); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + HoodieActiveTimeline mdtTimeline = mdtMetaClient.getActiveTimeline(); + assertEquals(1, timeline.countInstants()); + assertEquals(1, timeline.getCommitsTimeline().filterCompletedInstants().countInstants()); + assertEquals(3, mdtTimeline.countInstants()); + assertEquals(3, mdtTimeline.getCommitsTimeline().filterCompletedInstants().countInstants()); + String mdtInitCommit2 = HoodieTableMetadataUtil.createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, 1); + Pair<HoodieInstant, HoodieCommitMetadata> lastCommitMetadataWithValidData = + mdtTimeline.getLastCommitMetadataWithValidData().get(); + String commit = lastCommitMetadataWithValidData.getLeft().getTimestamp(); + assertTrue(timeline.getCommitsTimeline().containsInstant(commit)); + assertTrue(mdtTimeline.getCommitsTimeline().containsInstant(commit)); + + // Transition the last commit to inflight in DT + deleteMetaFile(metaClient.getFs(), basePath, commit, COMMIT_EXTENSION); + + // Remove the last commit and written data files in MDT + List<String> dataFiles = lastCommitMetadataWithValidData.getRight().getWriteStats().stream().map( + HoodieWriteStat::getPath).collect(Collectors.toList()); + + for (String relativeFilePath : dataFiles) { + deleteFileFromDfs(metaClient.getFs(), mdtBasePath + "/" + relativeFilePath); + } + + deleteMetaFile(metaClient.getFs(), mdtBasePath, commit, DELTA_COMMIT_EXTENSION); + deleteMetaFile(metaClient.getFs(), mdtBasePath, commit, DELTA_COMMIT_EXTENSION + INFLIGHT_EXTENSION); + deleteMetaFile(metaClient.getFs(), mdtBasePath, commit, DELTA_COMMIT_EXTENSION + REQUESTED_EXTENSION); + + // Transition the second init commit for record_index partition to inflight in MDT + deleteMetaFile(metaClient.getFs(), mdtBasePath, mdtInitCommit2, DELTA_COMMIT_EXTENSION); + metaClient.getTableConfig().setMetadataPartitionState( + metaClient, MetadataPartitionType.RECORD_INDEX, false); + metaClient.getTableConfig().setMetadataPartitionsInflight( + metaClient, MetadataPartitionType.RECORD_INDEX); + timeline = metaClient.getActiveTimeline().reload(); + mdtTimeline = mdtMetaClient.getActiveTimeline().reload(); + assertEquals(commit, timeline.lastInstant().get().getTimestamp()); + assertTrue(timeline.lastInstant().get().isInflight()); + assertEquals(mdtInitCommit2, mdtTimeline.lastInstant().get().getTimestamp()); + assertTrue(mdtTimeline.lastInstant().get().isInflight()); + } + + public static void deleteFileFromDfs(FileSystem fs, String targetPath) throws IOException { + if (fs.exists(new Path(targetPath))) { + fs.delete(new Path(targetPath), true); + } + } + + public static void deleteMetaFile(FileSystem fs, String basePath, String instantTime, String suffix) throws IOException { + String targetPath = basePath + "/" + METAFOLDER_NAME + "/" + instantTime + suffix; + deleteFileFromDfs(fs, targetPath); + } + /** * Test arguments - Table type, populate meta fields, exclude key from payload. */ @@ -2163,7 +2280,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { // make all commits to inflight in metadata table. Still read should go through, just that it may not return any data. FileCreateUtils.deleteDeltaCommit(basePath + "/.hoodie/metadata/", commitTimestamps[0]); - FileCreateUtils.deleteDeltaCommit(basePath + " /.hoodie/metadata/", HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP); + FileCreateUtils.deleteDeltaCommit(basePath + " /.hoodie/metadata/", SOLO_COMMIT_TIMESTAMP); assertEquals(getAllFiles(metadata(client)).stream().map(p -> p.getName()).map(n -> FSUtils.getCommitTime(n)).collect(Collectors.toSet()).size(), 0); } }