This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 1d5f2f7c63d [HUDI-6376] Support for deletes in HUDI Indexes including metadata table record index. (#9058) 1d5f2f7c63d is described below commit 1d5f2f7c63de441b9f475dd7ba4cf1540e0f9c42 Author: Prashant Wason <pwa...@uber.com> AuthorDate: Fri Jun 30 09:12:36 2023 -0700 [HUDI-6376] Support for deletes in HUDI Indexes including metadata table record index. (#9058) * [HUDI-6376] Support for deletes in HUDI Indexes including metadata table record index. --------- Co-authored-by: Shiyan Xu <2701446+xushi...@users.noreply.github.com> Co-authored-by: sivabalan <n.siv...@gmail.com> --- .../java/org/apache/hudi/io/HoodieMergeHandle.java | 4 + .../functional/TestHoodieBackedMetadata.java | 94 ++++++++++++++++++++++ .../hudi/client/functional/TestHoodieIndex.java | 61 ++++++++++++++ .../index/hbase/TestSparkHoodieHBaseIndex.java | 3 +- .../org/apache/hudi/common/model/HoodieRecord.java | 13 ++- .../hudi/metadata/HoodieBackedTableMetadata.java | 6 +- .../hudi/metadata/HoodieMetadataPayload.java | 29 ++++++- 7 files changed, 202 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index cfe11b1fd8d..8c4b0bc18d5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -314,6 +314,10 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> recordsWritten++; } else { recordsDeleted++; + // Clear the new location as the record was deleted + newRecord.unseal(); + newRecord.clearNewLocation(); + newRecord.seal(); } writeStatus.markSuccess(newRecord, recordMetadata); // deflate record payload after recording success. This will help users access payload as a diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 075afd61eb1..a1657c204b8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -46,6 +46,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; +import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; @@ -103,6 +104,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.UpgradeDowngrade; +import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.MetadataMergeWriteStatus; import org.apache.avro.Schema; @@ -3068,6 +3070,98 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(client); } + @Test + public void testDeleteWithRecordIndex() throws Exception { + init(HoodieTableType.COPY_ON_WRITE, true); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withEnableRecordIndex(true).withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.RECORD_INDEX).build()) + .build(); + + String firstCommitTime = HoodieActiveTimeline.createNewInstantTime(); + String secondCommitTime; + List<HoodieRecord> allRecords; + List<String> keysToDelete; + List<HoodieRecord> recordsToDelete; + + // Initialize the dataset and add some commits. + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + // First commit + List<HoodieRecord> firstBatchOfrecords = dataGen.generateInserts(firstCommitTime, 10); + client.startCommitWithTime(firstCommitTime); + client.insert(jsc.parallelize(firstBatchOfrecords, 1), firstCommitTime).collect(); + + // Records got inserted and RI is initialized + metaClient = HoodieTableMetaClient.reload(metaClient); + assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX), "RI is disabled"); + assertEquals(firstBatchOfrecords.size(), + HoodieClientTestUtils.readCommit(writeConfig.getBasePath(), engineContext.getSqlContext(), metaClient.reloadActiveTimeline(), firstCommitTime).count()); + + // Another batch of records added + secondCommitTime = HoodieActiveTimeline.createNewInstantTime(); + List<HoodieRecord> secondBatchOfrecords = dataGen.generateInserts(secondCommitTime, 5); + client.startCommitWithTime(secondCommitTime); + client.bulkInsert(jsc.parallelize(secondBatchOfrecords, 1), secondCommitTime).collect(); + + assertEquals(secondBatchOfrecords.size(), + HoodieClientTestUtils.readCommit(writeConfig.getBasePath(), engineContext.getSqlContext(), metaClient.reloadActiveTimeline(), secondCommitTime).count()); + + allRecords = new ArrayList<>(firstBatchOfrecords); + allRecords.addAll(secondBatchOfrecords); + + // RI should have created mappings for all the records inserted above + HoodieTableMetadata metadataReader = HoodieTableMetadata.create(context, writeConfig.getMetadataConfig(), writeConfig.getBasePath()); + Map<String, HoodieRecordGlobalLocation> result = metadataReader + .readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList())); + assertEquals(allRecords.size(), result.size(), "RI should have mapping for all the records in firstCommit"); + + // Delete some records from each commit. This should also remove the RI mapping. + recordsToDelete = firstBatchOfrecords.subList(0, 3); + recordsToDelete.addAll(secondBatchOfrecords.subList(0, 2)); + keysToDelete = recordsToDelete.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()); + + String deleteTime = HoodieActiveTimeline.createNewInstantTime(); + client.startCommitWithTime(deleteTime); + client.delete(jsc.parallelize(recordsToDelete, 1).map(HoodieRecord::getKey), deleteTime); + + // RI should not return mappings for deleted records + metadataReader = HoodieTableMetadata.create(context, writeConfig.getMetadataConfig(), writeConfig.getBasePath()); + result = metadataReader.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList())); + assertEquals(allRecords.size() - recordsToDelete.size(), result.size(), "RI should not have mapping for deleted records"); + result.keySet().forEach(mappingKey -> assertFalse(keysToDelete.contains(mappingKey), "RI should not have mapping for deleted records")); + } + + // Compaction should work too by removing the deleted keys from the base files + // To perform compaction, we need to create a new write client as compaction is attempted before any operations in write client + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + // An empty delete to trigger compaction + String deleteTime = client.startCommit(); + client.delete(jsc.emptyRDD(), deleteTime); + + HoodieTableMetadata metadataReader = HoodieTableMetadata.create(context, writeConfig.getMetadataConfig(), writeConfig.getBasePath()); + assertTrue(metadataReader.getLatestCompactionTime().isPresent(), "Compaction should have taken place on MDT"); + + // RI should not return mappings for deleted records + metadataReader = HoodieTableMetadata.create(context, writeConfig.getMetadataConfig(), writeConfig.getBasePath()); + Map<String, HoodieRecordGlobalLocation> result = metadataReader.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList())); + assertEquals(allRecords.size() - keysToDelete.size(), result.size(), "RI should not have mapping for deleted records"); + result.keySet().forEach(mappingKey -> assertFalse(keysToDelete.contains(mappingKey), "RI should not have mapping for deleted records")); + + // Adding records with the same keys after delete should work + String reinsertTime = client.startCommit(); + client.upsert(jsc.parallelize(recordsToDelete, 1), reinsertTime).collect(); + + // New mappings should have been created for re-inserted records and should map to the new commit time + metadataReader = HoodieTableMetadata.create(context, writeConfig.getMetadataConfig(), writeConfig.getBasePath()); + result = metadataReader.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList())); + assertEquals(allRecords.size(), result.size(), "RI should have mappings for re-inserted records"); + for (String reInsertedKey : keysToDelete) { + assertEquals(reinsertTime, result.get(reInsertedKey).getInstantTime(), "RI mapping for re-inserted keys should have new commit time"); + } + } + } + private void validateMetadata(SparkRDDWriteClient testClient) throws IOException { validateMetadata(testClient, Option.empty()); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java index edf4327bfaf..936f8b7a29a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java @@ -78,6 +78,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; +import java.util.stream.Collectors; import java.util.stream.Stream; import scala.Tuple2; @@ -555,6 +556,66 @@ public class TestHoodieIndex extends TestHoodieMetadataBase { assertFalse(HoodieIndexUtils.checkIfValidCommit(timeline, instantTimestampSec)); } + @Test + public void testDelete() throws Exception { + setUp(IndexType.INMEMORY, true, false); + + // Insert records + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + List<HoodieRecord> records = getInserts(); + JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); + writeClient.startCommitWithTime(newCommitTime); + JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime); + assertNoWriteErrors(writeStatues.collect()); + writeClient.commit(newCommitTime, writeStatues); + + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); + + // Now tagLocation for these records, index should tag them correctly + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieSparkTable.create(config, context, metaClient); + JavaRDD<HoodieRecord> javaRDD = tagLocation(hoodieTable.getIndex(), writeRecords, hoodieTable); + Map<String, String> recordKeyToPartitionPathMap = new HashMap<>(); + List<HoodieRecord> hoodieRecords = writeRecords.collect(); + hoodieRecords.forEach(entry -> recordKeyToPartitionPathMap.put(entry.getRecordKey(), entry.getPartitionPath())); + + assertEquals(records.size(), javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size()); + assertEquals(records.size(), javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); + assertEquals(records.size(), javaRDD.filter(record -> (record.getCurrentLocation() != null + && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); + javaRDD.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry.getRecordKey()), entry.getPartitionPath(), "PartitionPath mismatch")); + + JavaRDD<HoodieKey> hoodieKeyJavaRDD = writeRecords.map(HoodieRecord::getKey); + JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = getRecordLocations(hoodieKeyJavaRDD, hoodieTable); + List<HoodieKey> hoodieKeys = hoodieKeyJavaRDD.collect(); + assertEquals(records.size(), recordLocations.collect().size()); + assertEquals(records.size(), recordLocations.map(record -> record._1).distinct().count()); + recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), "Missing HoodieKey")); + recordLocations.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), entry._1.getPartitionPath(), "PartitionPath mismatch")); + + // Delete some of the keys + final int numDeletes = records.size() / 2; + List<HoodieKey> keysToDelete = records.stream().limit(numDeletes).map(r -> new HoodieKey(r.getRecordKey(), r.getPartitionPath())).collect(Collectors.toList()); + String deleteCommitTime = HoodieActiveTimeline.createNewInstantTime(); + writeClient.startCommitWithTime(deleteCommitTime); + writeStatues = writeClient.delete(jsc.parallelize(keysToDelete, 1), deleteCommitTime); + assertNoWriteErrors(writeStatues.collect()); + writeClient.commit(deleteCommitTime, writeStatues); + + // Deleted records should not be found in the index + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieSparkTable.create(config, context, metaClient); + javaRDD = tagLocation(hoodieTable.getIndex(), jsc.parallelize(records.subList(0, numDeletes)), hoodieTable); + assertEquals(0, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size()); + assertEquals(numDeletes, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); + + // Other records should be found + javaRDD = tagLocation(hoodieTable.getIndex(), jsc.parallelize(records.subList(numDeletes, records.size())), hoodieTable); + assertEquals(records.size() - numDeletes, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size()); + assertEquals(records.size() - numDeletes, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); + } + private HoodieWriteConfig.Builder getConfigBuilder() { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java index d37bee23688..be663d05bfe 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java @@ -63,6 +63,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -312,7 +313,7 @@ public class TestSparkHoodieHBaseIndex extends SparkClientFunctionalTestHarness && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); } - @Test + @Disabled("HUDI-6460") public void testTagLocationAndPartitionPathUpdateWithExplicitRollback() throws Exception { final int numRecords = 10; final String oldPartitionPath = "1970/01/01"; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index 1ee019eb713..2a519d1334b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -233,11 +233,10 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf /** * Sets the new currentLocation of the record, after being written. This again should happen exactly-once. */ - public HoodieRecord setNewLocation(HoodieRecordLocation location) { + public void setNewLocation(HoodieRecordLocation location) { checkState(); assert newLocation == null; this.newLocation = location; - return this; } @Nullable @@ -311,6 +310,16 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf protected abstract T readRecordPayload(Kryo kryo, Input input); + /** + * Clears the new currentLocation of the record. + * + * This is required in the delete path so that Index can track that this record was deleted. + */ + public void clearNewLocation() { + checkState(); + this.newLocation = null; + } + /** * NOTE: This method is declared final to make sure there's no polymorphism and therefore * JIT compiler could perform more aggressive optimizations diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 38da0cac7a8..21983f31f66 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -328,8 +328,10 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { records.merge( logRecord.getRecordKey(), logRecord, - (oldRecord, newRecord) -> - new HoodieAvroRecord<>(oldRecord.getKey(), newRecord.getData().preCombine(oldRecord.getData())) + (oldRecord, newRecord) -> { + HoodieMetadataPayload mergedPayload = newRecord.getData().preCombine(oldRecord.getData()); + return mergedPayload.isDeleted() ? null : new HoodieAvroRecord<>(oldRecord.getKey(), mergedPayload); + } )); timings.add(timer.endTimer()); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 888833b45a0..9b4698c0450 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -62,6 +62,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import javax.annotation.Nullable; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -209,9 +210,10 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata private HoodieMetadataBloomFilter bloomFilterMetadata = null; private HoodieMetadataColumnStats columnStatMetadata = null; private HoodieRecordIndexInfo recordIndexMetadata; + private boolean isDeletedRecord = false; - public HoodieMetadataPayload(GenericRecord record, Comparable<?> orderingVal) { - this(Option.of(record)); + public HoodieMetadataPayload(@Nullable GenericRecord record, Comparable<?> orderingVal) { + this(Option.ofNullable(record)); } public HoodieMetadataPayload(Option<GenericRecord> recordOpt) { @@ -283,6 +285,8 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata Integer.parseInt(recordIndexRecord.get(RECORD_INDEX_FIELD_FILE_INDEX).toString()), Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_INSTANT_TIME).toString())); } + } else { + this.isDeletedRecord = true; } } @@ -403,6 +407,16 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata @Override public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) { + if (this.isDeletedRecord) { + // This happens when a record has been deleted. The previous version of the record should be ignored. + return this; + } + if (previousRecord.isDeletedRecord) { + // This happens when a record with same key is added after a deletion. + return this; + } + + // Validation of record merge scenario. Only records of same type and key can be combined. checkArgument(previousRecord.type == type, "Cannot combine " + previousRecord.type + " with " + type); checkArgument(previousRecord.key.equals(key), @@ -419,6 +433,11 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata case METADATA_TYPE_COLUMN_STATS: return new HoodieMetadataPayload(key, combineColumnStatsMetadata(previousRecord)); case METADATA_TYPE_RECORD_INDEX: + // There is always a single mapping and the latest mapping is maintained. + // Mappings in record index can change in two scenarios: + // 1. A key deleted from dataset and then added again (new filedID) + // 2. A key moved to a different file due to clustering + // No need to merge with previous record index, always pick the latest payload. return this; default: @@ -455,7 +474,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata @Override public Option<IndexedRecord> getInsertValue(Schema schemaIgnored, Properties propertiesIgnored) throws IOException { - if (key == null) { + if (key == null || this.isDeletedRecord) { return Option.empty(); } @@ -767,6 +786,10 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata return new HoodieRecordGlobalLocation(partition, HoodieActiveTimeline.formatDate(instantDate), fileId); } + public boolean isDeleted() { + return isDeletedRecord; + } + @Override public boolean equals(Object other) { if (other == this) {