This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 6d3311c34c8 [HUDI-6443] Support delete_partition, insert_overwrite/table with record-level index (#9055) 6d3311c34c8 is described below commit 6d3311c34c8548339aee8c3708fe50855a04afb8 Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Mon Jul 10 01:24:44 2023 +0800 [HUDI-6443] Support delete_partition, insert_overwrite/table with record-level index (#9055) - Support delete_partition, insert_overwrite and insert_overwrite_table with record-level index. The metadata records should be updated accordingly. all records in the deleted partition(s) should be deleted from RLI (for delete_partition operation) newly inserted records should be present in RLI old records in the affected partitions should be removed from RLI old records that happen to have the same record key as the new inserts won't be removed from RLI; their entries will be updated --------- Co-authored-by: sivabalan <n.siv...@gmail.com> --- .../metadata/HoodieBackedTableMetadataWriter.java | 65 ++++++++++++++++++---- .../hudi/functional/TestRecordLevelIndex.scala | 28 +++++++++- 2 files changed, 80 insertions(+), 13 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 97d1ce5e8b2..df73145a1bb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -42,7 +42,9 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordDelegate; import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; @@ -477,7 +479,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta + partitions.size() + " partitions"); // Collect record keys from the files in parallel - HoodieData<HoodieRecord> records = readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs); + HoodieData<HoodieRecord> records = readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs, false); records.persist("MEMORY_AND_DISK_SER"); final long recordCount = records.count(); @@ -495,7 +497,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta * Read the record keys from base files in partitions and return records. */ private HoodieData<HoodieRecord> readRecordKeysFromBaseFiles(HoodieEngineContext engineContext, - List<Pair<String, String>> partitionBaseFilePairs) { + List<Pair<String, String>> partitionBaseFilePairs, + boolean forDelete) { if (partitionBaseFilePairs.isEmpty()) { return engineContext.emptyHoodieData(); } @@ -524,8 +527,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta @Override public HoodieRecord next() { - return HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), partition, fileId, - instantTime); + return forDelete + ? HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next()) + : HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), partition, fileId, instantTime); } }; }); @@ -872,9 +876,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta // Updates for record index are created by parsing the WriteStatus which is a hudi-client object. Hence, we cannot yet move this code // to the HoodieTableMetadataUtil class in hudi-common. - if (writeStatus != null && !writeStatus.isEmpty()) { - partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX, getRecordIndexUpdates(writeStatus)); - } + HoodieData<HoodieRecord> updatesFromWriteStatuses = getRecordIndexUpdates(writeStatus); + HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpdates(updatesFromWriteStatuses, commitMetadata); + partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX, updatesFromWriteStatuses.union(additionalUpdates)); + return partitionToRecordMap; }); closeInternal(); @@ -884,7 +889,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta * Update from {@code HoodieCleanMetadata}. * * @param cleanMetadata {@code HoodieCleanMetadata} - * @param instantTime Timestamp at which the clean was completed + * @param instantTime Timestamp at which the clean was completed */ @Override public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { @@ -897,7 +902,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta * Update from {@code HoodieRestoreMetadata}. * * @param restoreMetadata {@code HoodieRestoreMetadata} - * @param instantTime Timestamp at which the restore was performed + * @param instantTime Timestamp at which the restore was performed */ @Override public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { @@ -911,7 +916,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta * Update from {@code HoodieRollbackMetadata}. * * @param rollbackMetadata {@code HoodieRollbackMetadata} - * @param instantTime Timestamp at which the rollback was performed + * @param instantTime Timestamp at which the rollback was performed */ @Override public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) { @@ -1225,6 +1230,46 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta .filter(Objects::nonNull); } + private HoodieData<HoodieRecord> getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata replaceCommitMetadata) { + final HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient, + dataMetaClient.getActiveTimeline(), metadata); + List<Pair<String, String>> partitionBaseFilePairs = replaceCommitMetadata + .getPartitionToReplaceFileIds() + .keySet().stream().flatMap(partition + -> fsView.getLatestBaseFiles(partition).map(f -> Pair.of(partition, f.getFileName()))) + .collect(Collectors.toList()); + + return readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs, true); + } + + private HoodieData<HoodieRecord> getRecordIndexAdditionalUpdates(HoodieData<HoodieRecord> updatesFromWriteStatuses, HoodieCommitMetadata commitMetadata) { + WriteOperationType operationType = commitMetadata.getOperationType(); + if (operationType == WriteOperationType.INSERT_OVERWRITE) { + // load existing records from replaced filegroups and left anti join overwriting records + // return partition-level unmatched records (with newLocation being null) to be deleted from RLI + return getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata) commitMetadata) + .mapToPair(r -> Pair.of(r.getKey(), r)) + .leftOuterJoin(updatesFromWriteStatuses.mapToPair(r -> Pair.of(r.getKey(), r))) + .values() + .filter(p -> !p.getRight().isPresent()) + .map(Pair::getLeft); + } else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) { + // load existing records from replaced filegroups and left anti join overwriting records + // return globally unmatched records (with newLocation being null) to be deleted from RLI + return getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata) commitMetadata) + .mapToPair(r -> Pair.of(r.getRecordKey(), r)) + .leftOuterJoin(updatesFromWriteStatuses.mapToPair(r -> Pair.of(r.getRecordKey(), r))) + .values() + .filter(p -> !p.getRight().isPresent()) + .map(Pair::getLeft); + } else if (operationType == WriteOperationType.DELETE_PARTITION) { + // all records from the target partition(s) to be deleted from RLI + return getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata) commitMetadata); + } else { + return engineContext.emptyHoodieData(); + } + } + protected void closeInternal() { try { close(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala index e9100a518bf..614a412c4a5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala @@ -25,8 +25,8 @@ import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.utils.MetadataConversionUtils import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.model.{ActionType, HoodieBaseFile, HoodieCommitMetadata, HoodieTableType, WriteOperationType} -import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.model._ +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig, HoodieCompactionConfig, HoodieWriteConfig} @@ -108,7 +108,6 @@ class TestRecordLevelIndex extends HoodieSparkClientTestBase { saveMode = SaveMode.Append) } - @Disabled("needs delete support") @ParameterizedTest @CsvSource(Array("COPY_ON_WRITE,true", "COPY_ON_WRITE,false", "MERGE_ON_READ,true", "MERGE_ON_READ,false")) def testRLIBulkInsertThenInsertOverwrite(tableType: HoodieTableType, enableRowWriter: Boolean): Unit = { @@ -200,6 +199,26 @@ class TestRecordLevelIndex extends HoodieSparkClientTestBase { validateDataAndRecordIndices(hudiOpts) } + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testRLIWithDeletePartition(tableType: HoodieTableType): Unit = { + val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name()) + val latestSnapshot = doWriteAndValidateDataAndRecordIndex(hudiOpts, + operation = DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + + Using(getHoodieWriteClient(getWriteConfig(hudiOpts))) { client => + val commitTime = client.startCommit + client.startCommitWithTime(commitTime, HoodieTimeline.REPLACE_COMMIT_ACTION) + val deletingPartition = dataGen.getPartitionPaths.last + val partitionList = Collections.singletonList(deletingPartition) + client.deletePartitions(partitionList, commitTime) + + val deletedDf = latestSnapshot.filter(s"partition = $deletingPartition") + validateDataAndRecordIndices(hudiOpts, deletedDf) + } + } + @ParameterizedTest @EnumSource(classOf[HoodieTableType]) def testRLIUpsertAndDropIndex(tableType: HoodieTableType): Unit = { @@ -503,15 +522,18 @@ class TestRecordLevelIndex extends HoodieSparkClientTestBase { latestBatch = recordsToStrings(dataGen.generateInserts(getInstantTime(), 5)).asScala } val latestBatchDf = spark.read.json(spark.sparkContext.parallelize(latestBatch, 2)) + latestBatchDf.cache() latestBatchDf.write.format("org.apache.hudi") .options(hudiOpts) .option(DataSourceWriteOptions.OPERATION.key, operation) .mode(saveMode) .save(basePath) val deletedDf = calculateMergedDf(latestBatchDf, operation) + deletedDf.cache() if (validate) { validateDataAndRecordIndices(hudiOpts, deletedDf) } + deletedDf.unpersist() latestBatchDf }