This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d5f2f7c63d [HUDI-6376] Support for deletes in HUDI Indexes including 
metadata table record index. (#9058)
1d5f2f7c63d is described below

commit 1d5f2f7c63de441b9f475dd7ba4cf1540e0f9c42
Author: Prashant Wason <pwa...@uber.com>
AuthorDate: Fri Jun 30 09:12:36 2023 -0700

    [HUDI-6376] Support for deletes in HUDI Indexes including metadata table 
record index. (#9058)
    
    * [HUDI-6376] Support for deletes in HUDI Indexes including metadata table 
record index.
    
    ---------
    
    Co-authored-by: Shiyan Xu <2701446+xushi...@users.noreply.github.com>
    Co-authored-by: sivabalan <n.siv...@gmail.com>
---
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  4 +
 .../functional/TestHoodieBackedMetadata.java       | 94 ++++++++++++++++++++++
 .../hudi/client/functional/TestHoodieIndex.java    | 61 ++++++++++++++
 .../index/hbase/TestSparkHoodieHBaseIndex.java     |  3 +-
 .../org/apache/hudi/common/model/HoodieRecord.java | 13 ++-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  6 +-
 .../hudi/metadata/HoodieMetadataPayload.java       | 29 ++++++-
 7 files changed, 202 insertions(+), 8 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index cfe11b1fd8d..8c4b0bc18d5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -314,6 +314,10 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
         recordsWritten++;
       } else {
         recordsDeleted++;
+        // Clear the new location as the record was deleted
+        newRecord.unseal();
+        newRecord.clearNewLocation();
+        newRecord.seal();
       }
       writeStatus.markSuccess(newRecord, recordMetadata);
       // deflate record payload after recording success. This will help users 
access payload as a
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 075afd61eb1..a1657c204b8 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -46,6 +46,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -103,6 +104,7 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
 import org.apache.hudi.table.upgrade.UpgradeDowngrade;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 
 import org.apache.avro.Schema;
@@ -3068,6 +3070,98 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     validateMetadata(client);
   }
 
+  @Test
+  public void testDeleteWithRecordIndex() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE, true);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withEnableRecordIndex(true).withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.RECORD_INDEX).build())
+        .build();
+
+    String firstCommitTime = HoodieActiveTimeline.createNewInstantTime();
+    String secondCommitTime;
+    List<HoodieRecord> allRecords;
+    List<String> keysToDelete;
+    List<HoodieRecord> recordsToDelete;
+
+    // Initialize the dataset and add some commits.
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      // First commit
+      List<HoodieRecord> firstBatchOfrecords = 
dataGen.generateInserts(firstCommitTime, 10);
+      client.startCommitWithTime(firstCommitTime);
+      client.insert(jsc.parallelize(firstBatchOfrecords, 1), 
firstCommitTime).collect();
+
+      // Records got inserted and RI is initialized
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
 "RI is disabled");
+      assertEquals(firstBatchOfrecords.size(),
+          HoodieClientTestUtils.readCommit(writeConfig.getBasePath(), 
engineContext.getSqlContext(), metaClient.reloadActiveTimeline(), 
firstCommitTime).count());
+
+      // Another batch of records added
+      secondCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      List<HoodieRecord> secondBatchOfrecords = 
dataGen.generateInserts(secondCommitTime, 5);
+      client.startCommitWithTime(secondCommitTime);
+      client.bulkInsert(jsc.parallelize(secondBatchOfrecords, 1), 
secondCommitTime).collect();
+
+      assertEquals(secondBatchOfrecords.size(),
+          HoodieClientTestUtils.readCommit(writeConfig.getBasePath(), 
engineContext.getSqlContext(), metaClient.reloadActiveTimeline(), 
secondCommitTime).count());
+
+      allRecords = new ArrayList<>(firstBatchOfrecords);
+      allRecords.addAll(secondBatchOfrecords);
+
+      // RI should have created mappings for all the records inserted above
+      HoodieTableMetadata metadataReader = HoodieTableMetadata.create(context, 
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
+      Map<String, HoodieRecordGlobalLocation> result = metadataReader
+          
.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
+      assertEquals(allRecords.size(), result.size(), "RI should have mapping 
for all the records in firstCommit");
+
+      // Delete some records from each commit. This should also remove the RI 
mapping.
+      recordsToDelete = firstBatchOfrecords.subList(0, 3);
+      recordsToDelete.addAll(secondBatchOfrecords.subList(0, 2));
+      keysToDelete = 
recordsToDelete.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList());
+
+      String deleteTime = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(deleteTime);
+      client.delete(jsc.parallelize(recordsToDelete, 
1).map(HoodieRecord::getKey), deleteTime);
+
+      // RI should not return mappings for deleted records
+      metadataReader = HoodieTableMetadata.create(context, 
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
+      result = 
metadataReader.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
+      assertEquals(allRecords.size() - recordsToDelete.size(), result.size(), 
"RI should not have mapping for deleted records");
+      result.keySet().forEach(mappingKey -> 
assertFalse(keysToDelete.contains(mappingKey), "RI should not have mapping for 
deleted records"));
+    }
+
+    // Compaction should work too by removing the deleted keys from the base 
files
+    // To perform compaction, we need to create a new write client as 
compaction is attempted before any operations in write client
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      // An empty delete to trigger compaction
+      String deleteTime = client.startCommit();
+      client.delete(jsc.emptyRDD(), deleteTime);
+
+      HoodieTableMetadata metadataReader = HoodieTableMetadata.create(context, 
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
+      assertTrue(metadataReader.getLatestCompactionTime().isPresent(), 
"Compaction should have taken place on MDT");
+
+      // RI should not return mappings for deleted records
+      metadataReader = HoodieTableMetadata.create(context, 
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
+      Map<String, HoodieRecordGlobalLocation> result = 
metadataReader.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
+      assertEquals(allRecords.size() - keysToDelete.size(), result.size(), "RI 
should not have mapping for deleted records");
+      result.keySet().forEach(mappingKey -> 
assertFalse(keysToDelete.contains(mappingKey), "RI should not have mapping for 
deleted records"));
+
+      // Adding records with the same keys after delete should work
+      String reinsertTime = client.startCommit();
+      client.upsert(jsc.parallelize(recordsToDelete, 1), 
reinsertTime).collect();
+
+      // New mappings should have been created for re-inserted records and 
should map to the new commit time
+      metadataReader = HoodieTableMetadata.create(context, 
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
+      result = 
metadataReader.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
+      assertEquals(allRecords.size(), result.size(), "RI should have mappings 
for re-inserted records");
+      for (String reInsertedKey : keysToDelete) {
+        assertEquals(reinsertTime, result.get(reInsertedKey).getInstantTime(), 
"RI mapping for re-inserted keys should have new commit time");
+      }
+    }
+  }
+
   private void validateMetadata(SparkRDDWriteClient testClient) throws 
IOException {
     validateMetadata(testClient, Option.empty());
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
index edf4327bfaf..936f8b7a29a 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
@@ -78,6 +78,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import scala.Tuple2;
@@ -555,6 +556,66 @@ public class TestHoodieIndex extends 
TestHoodieMetadataBase {
     assertFalse(HoodieIndexUtils.checkIfValidCommit(timeline, 
instantTimestampSec));
   }
 
+  @Test
+  public void testDelete() throws Exception {
+    setUp(IndexType.INMEMORY, true, false);
+
+    // Insert records
+    String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+    List<HoodieRecord> records = getInserts();
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    writeClient.startCommitWithTime(newCommitTime);
+    JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, 
newCommitTime);
+    assertNoWriteErrors(writeStatues.collect());
+    writeClient.commit(newCommitTime, writeStatues);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
+
+    // Now tagLocation for these records, index should tag them correctly
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+    JavaRDD<HoodieRecord> javaRDD = tagLocation(hoodieTable.getIndex(), 
writeRecords, hoodieTable);
+    Map<String, String> recordKeyToPartitionPathMap = new HashMap<>();
+    List<HoodieRecord> hoodieRecords = writeRecords.collect();
+    hoodieRecords.forEach(entry -> 
recordKeyToPartitionPathMap.put(entry.getRecordKey(), 
entry.getPartitionPath()));
+
+    assertEquals(records.size(), 
javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
+    assertEquals(records.size(), javaRDD.map(record -> 
record.getKey().getRecordKey()).distinct().count());
+    assertEquals(records.size(), javaRDD.filter(record -> 
(record.getCurrentLocation() != null
+        && 
record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
+    javaRDD.foreach(entry -> 
assertEquals(recordKeyToPartitionPathMap.get(entry.getRecordKey()), 
entry.getPartitionPath(), "PartitionPath mismatch"));
+
+    JavaRDD<HoodieKey> hoodieKeyJavaRDD = 
writeRecords.map(HoodieRecord::getKey);
+    JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = 
getRecordLocations(hoodieKeyJavaRDD, hoodieTable);
+    List<HoodieKey> hoodieKeys = hoodieKeyJavaRDD.collect();
+    assertEquals(records.size(), recordLocations.collect().size());
+    assertEquals(records.size(), recordLocations.map(record -> 
record._1).distinct().count());
+    recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), 
"Missing HoodieKey"));
+    recordLocations.foreach(entry -> 
assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), 
entry._1.getPartitionPath(), "PartitionPath mismatch"));
+
+    // Delete some of the keys
+    final int numDeletes = records.size() / 2;
+    List<HoodieKey> keysToDelete = records.stream().limit(numDeletes).map(r -> 
new HoodieKey(r.getRecordKey(), 
r.getPartitionPath())).collect(Collectors.toList());
+    String deleteCommitTime = HoodieActiveTimeline.createNewInstantTime();
+    writeClient.startCommitWithTime(deleteCommitTime);
+    writeStatues = writeClient.delete(jsc.parallelize(keysToDelete, 1), 
deleteCommitTime);
+    assertNoWriteErrors(writeStatues.collect());
+    writeClient.commit(deleteCommitTime, writeStatues);
+
+    // Deleted records should not be found in the index
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+    javaRDD = tagLocation(hoodieTable.getIndex(), 
jsc.parallelize(records.subList(0, numDeletes)), hoodieTable);
+    assertEquals(0, 
javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
+    assertEquals(numDeletes, javaRDD.map(record -> 
record.getKey().getRecordKey()).distinct().count());
+
+    // Other records should be found
+    javaRDD = tagLocation(hoodieTable.getIndex(), 
jsc.parallelize(records.subList(numDeletes, records.size())), hoodieTable);
+    assertEquals(records.size() - numDeletes, 
javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
+    assertEquals(records.size() - numDeletes, javaRDD.map(record -> 
record.getKey().getRecordKey()).distinct().count());
+  }
+
   private HoodieWriteConfig.Builder getConfigBuilder() {
     return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 
2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
index d37bee23688..be663d05bfe 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
@@ -63,6 +63,7 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
@@ -312,7 +313,7 @@ public class TestSparkHoodieHBaseIndex extends 
SparkClientFunctionalTestHarness
         && 
record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
   }
 
-  @Test
+  @Disabled("HUDI-6460")
   public void testTagLocationAndPartitionPathUpdateWithExplicitRollback() 
throws Exception {
     final int numRecords = 10;
     final String oldPartitionPath = "1970/01/01";
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 1ee019eb713..2a519d1334b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -233,11 +233,10 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
   /**
    * Sets the new currentLocation of the record, after being written. This 
again should happen exactly-once.
    */
-  public HoodieRecord setNewLocation(HoodieRecordLocation location) {
+  public void setNewLocation(HoodieRecordLocation location) {
     checkState();
     assert newLocation == null;
     this.newLocation = location;
-    return this;
   }
 
   @Nullable
@@ -311,6 +310,16 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
 
   protected abstract T readRecordPayload(Kryo kryo, Input input);
 
+  /**
+   * Clears the new currentLocation of the record. 
+   *
+   * This is required in the delete path so that Index can track that this 
record was deleted.
+   */
+  public void clearNewLocation() {
+    checkState();
+    this.newLocation = null;
+  }
+
   /**
    * NOTE: This method is declared final to make sure there's no polymorphism 
and therefore
    *       JIT compiler could perform more aggressive optimizations
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 38da0cac7a8..21983f31f66 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -328,8 +328,10 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         records.merge(
             logRecord.getRecordKey(),
             logRecord,
-            (oldRecord, newRecord) ->
-                new HoodieAvroRecord<>(oldRecord.getKey(), 
newRecord.getData().preCombine(oldRecord.getData()))
+            (oldRecord, newRecord) -> {
+              HoodieMetadataPayload mergedPayload = 
newRecord.getData().preCombine(oldRecord.getData());
+              return mergedPayload.isDeleted() ? null : new 
HoodieAvroRecord<>(oldRecord.getKey(), mergedPayload);
+            }
         ));
 
     timings.add(timer.endTimer());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 888833b45a0..9b4698c0450 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -209,9 +210,10 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
   private HoodieMetadataBloomFilter bloomFilterMetadata = null;
   private HoodieMetadataColumnStats columnStatMetadata = null;
   private HoodieRecordIndexInfo recordIndexMetadata;
+  private boolean isDeletedRecord = false;
 
-  public HoodieMetadataPayload(GenericRecord record, Comparable<?> 
orderingVal) {
-    this(Option.of(record));
+  public HoodieMetadataPayload(@Nullable GenericRecord record, Comparable<?> 
orderingVal) {
+    this(Option.ofNullable(record));
   }
 
   public HoodieMetadataPayload(Option<GenericRecord> recordOpt) {
@@ -283,6 +285,8 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
             
Integer.parseInt(recordIndexRecord.get(RECORD_INDEX_FIELD_FILE_INDEX).toString()),
             
Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_INSTANT_TIME).toString()));
       }
+    } else {
+      this.isDeletedRecord = true;
     }
   }
 
@@ -403,6 +407,16 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
 
   @Override
   public HoodieMetadataPayload preCombine(HoodieMetadataPayload 
previousRecord) {
+    if (this.isDeletedRecord) {
+      // This happens when a record has been deleted. The previous version of 
the record should be ignored.
+      return this;
+    }
+    if (previousRecord.isDeletedRecord) {
+      // This happens when a record with same key is added after a deletion.
+      return this;
+    }
+
+    // Validation of record merge scenario. Only records of same type and key 
can be combined. 
     checkArgument(previousRecord.type == type,
         "Cannot combine " + previousRecord.type + " with " + type);
     checkArgument(previousRecord.key.equals(key),
@@ -419,6 +433,11 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
       case METADATA_TYPE_COLUMN_STATS:
         return new HoodieMetadataPayload(key, 
combineColumnStatsMetadata(previousRecord));
       case METADATA_TYPE_RECORD_INDEX:
+        // There is always a single mapping and the latest mapping is 
maintained.
+        // Mappings in record index can change in two scenarios:
+        // 1. A key deleted from dataset and then added again (new filedID)
+        // 2. A key moved to a different file due to clustering
+
         // No need to merge with previous record index, always pick the latest 
payload.
         return this;
       default:
@@ -455,7 +474,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
 
   @Override
   public Option<IndexedRecord> getInsertValue(Schema schemaIgnored, Properties 
propertiesIgnored) throws IOException {
-    if (key == null) {
+    if (key == null || this.isDeletedRecord) {
       return Option.empty();
     }
 
@@ -767,6 +786,10 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     return new HoodieRecordGlobalLocation(partition, 
HoodieActiveTimeline.formatDate(instantDate), fileId);
   }
 
+  public boolean isDeleted() {
+    return isDeletedRecord;
+  }
+
   @Override
   public boolean equals(Object other) {
     if (other == this) {

Reply via email to