This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d3311c34c8 [HUDI-6443] Support delete_partition, 
insert_overwrite/table with record-level index (#9055)
6d3311c34c8 is described below

commit 6d3311c34c8548339aee8c3708fe50855a04afb8
Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Mon Jul 10 01:24:44 2023 +0800

    [HUDI-6443] Support delete_partition, insert_overwrite/table with 
record-level index (#9055)
    
    - Support delete_partition, insert_overwrite and insert_overwrite_table 
with record-level index. The metadata records should be updated accordingly.
    
    all records in the deleted partition(s) should be deleted from RLI (for 
delete_partition operation)
    newly inserted records should be present in RLI
    old records in the affected partitions should be removed from RLI
    old records that happen to have the same record key as the new inserts 
won't be removed from RLI; their entries will be updated
    ---------
    
    Co-authored-by: sivabalan <n.siv...@gmail.com>
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 65 ++++++++++++++++++----
 .../hudi/functional/TestRecordLevelIndex.scala     | 28 +++++++++-
 2 files changed, 80 insertions(+), 13 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 97d1ce5e8b2..df73145a1bb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -42,7 +42,9 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordDelegate;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
@@ -477,7 +479,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
         + partitions.size() + " partitions");
 
     // Collect record keys from the files in parallel
-    HoodieData<HoodieRecord> records = 
readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs);
+    HoodieData<HoodieRecord> records = 
readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs, false);
     records.persist("MEMORY_AND_DISK_SER");
     final long recordCount = records.count();
 
@@ -495,7 +497,8 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
    * Read the record keys from base files in partitions and return records.
    */
   private HoodieData<HoodieRecord> 
readRecordKeysFromBaseFiles(HoodieEngineContext engineContext,
-                                                               
List<Pair<String, String>> partitionBaseFilePairs) {
+                                                               
List<Pair<String, String>> partitionBaseFilePairs,
+                                                               boolean 
forDelete) {
     if (partitionBaseFilePairs.isEmpty()) {
       return engineContext.emptyHoodieData();
     }
@@ -524,8 +527,9 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
 
         @Override
         public HoodieRecord next() {
-          return 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId,
-              instantTime);
+          return forDelete
+              ? 
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next())
+              : 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId, instantTime);
         }
       };
     });
@@ -872,9 +876,10 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
 
       // Updates for record index are created by parsing the WriteStatus which 
is a hudi-client object. Hence, we cannot yet move this code
       // to the HoodieTableMetadataUtil class in hudi-common.
-      if (writeStatus != null && !writeStatus.isEmpty()) {
-        partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX, 
getRecordIndexUpdates(writeStatus));
-      }
+      HoodieData<HoodieRecord> updatesFromWriteStatuses = 
getRecordIndexUpdates(writeStatus);
+      HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpdates(updatesFromWriteStatuses, commitMetadata);
+      partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX, 
updatesFromWriteStatuses.union(additionalUpdates));
+
       return partitionToRecordMap;
     });
     closeInternal();
@@ -884,7 +889,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
    * Update from {@code HoodieCleanMetadata}.
    *
    * @param cleanMetadata {@code HoodieCleanMetadata}
-   * @param instantTime Timestamp at which the clean was completed
+   * @param instantTime   Timestamp at which the clean was completed
    */
   @Override
   public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
@@ -897,7 +902,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
    * Update from {@code HoodieRestoreMetadata}.
    *
    * @param restoreMetadata {@code HoodieRestoreMetadata}
-   * @param instantTime Timestamp at which the restore was performed
+   * @param instantTime     Timestamp at which the restore was performed
    */
   @Override
   public void update(HoodieRestoreMetadata restoreMetadata, String 
instantTime) {
@@ -911,7 +916,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
    * Update from {@code HoodieRollbackMetadata}.
    *
    * @param rollbackMetadata {@code HoodieRollbackMetadata}
-   * @param instantTime Timestamp at which the rollback was performed
+   * @param instantTime      Timestamp at which the rollback was performed
    */
   @Override
   public void update(HoodieRollbackMetadata rollbackMetadata, String 
instantTime) {
@@ -1225,6 +1230,46 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
         .filter(Objects::nonNull);
   }
 
+  private HoodieData<HoodieRecord> 
getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata 
replaceCommitMetadata) {
+    final HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient,
+        dataMetaClient.getActiveTimeline(), metadata);
+    List<Pair<String, String>> partitionBaseFilePairs = replaceCommitMetadata
+        .getPartitionToReplaceFileIds()
+        .keySet().stream().flatMap(partition
+            -> fsView.getLatestBaseFiles(partition).map(f -> 
Pair.of(partition, f.getFileName())))
+        .collect(Collectors.toList());
+
+    return readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs, 
true);
+  }
+
+  private HoodieData<HoodieRecord> 
getRecordIndexAdditionalUpdates(HoodieData<HoodieRecord> 
updatesFromWriteStatuses, HoodieCommitMetadata commitMetadata) {
+    WriteOperationType operationType = commitMetadata.getOperationType();
+    if (operationType == WriteOperationType.INSERT_OVERWRITE) {
+      // load existing records from replaced filegroups and left anti join 
overwriting records
+      // return partition-level unmatched records (with newLocation being 
null) to be deleted from RLI
+      return getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata) 
commitMetadata)
+          .mapToPair(r -> Pair.of(r.getKey(), r))
+          .leftOuterJoin(updatesFromWriteStatuses.mapToPair(r -> 
Pair.of(r.getKey(), r)))
+          .values()
+          .filter(p -> !p.getRight().isPresent())
+          .map(Pair::getLeft);
+    } else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
+      // load existing records from replaced filegroups and left anti join 
overwriting records
+      // return globally unmatched records (with newLocation being null) to be 
deleted from RLI
+      return getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata) 
commitMetadata)
+          .mapToPair(r -> Pair.of(r.getRecordKey(), r))
+          .leftOuterJoin(updatesFromWriteStatuses.mapToPair(r -> 
Pair.of(r.getRecordKey(), r)))
+          .values()
+          .filter(p -> !p.getRight().isPresent())
+          .map(Pair::getLeft);
+    } else if (operationType == WriteOperationType.DELETE_PARTITION) {
+      // all records from the target partition(s) to be deleted from RLI
+      return getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata) 
commitMetadata);
+    } else {
+      return engineContext.emptyHoodieData();
+    }
+  }
+
   protected void closeInternal() {
     try {
       close();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index e9100a518bf..614a412c4a5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -25,8 +25,8 @@ import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.client.utils.MetadataConversionUtils
 import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{ActionType, HoodieBaseFile, 
HoodieCommitMetadata, HoodieTableType, WriteOperationType}
-import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig, 
HoodieCompactionConfig, HoodieWriteConfig}
@@ -108,7 +108,6 @@ class TestRecordLevelIndex extends 
HoodieSparkClientTestBase {
       saveMode = SaveMode.Append)
   }
 
-  @Disabled("needs delete support")
   @ParameterizedTest
   @CsvSource(Array("COPY_ON_WRITE,true", "COPY_ON_WRITE,false", 
"MERGE_ON_READ,true", "MERGE_ON_READ,false"))
   def testRLIBulkInsertThenInsertOverwrite(tableType: HoodieTableType, 
enableRowWriter: Boolean): Unit = {
@@ -200,6 +199,26 @@ class TestRecordLevelIndex extends 
HoodieSparkClientTestBase {
     validateDataAndRecordIndices(hudiOpts)
   }
 
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testRLIWithDeletePartition(tableType: HoodieTableType): Unit = {
+    val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType.name())
+    val latestSnapshot = doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+
+    Using(getHoodieWriteClient(getWriteConfig(hudiOpts))) { client =>
+      val commitTime = client.startCommit
+      client.startCommitWithTime(commitTime, 
HoodieTimeline.REPLACE_COMMIT_ACTION)
+      val deletingPartition = dataGen.getPartitionPaths.last
+      val partitionList = Collections.singletonList(deletingPartition)
+      client.deletePartitions(partitionList, commitTime)
+
+      val deletedDf = latestSnapshot.filter(s"partition = $deletingPartition")
+      validateDataAndRecordIndices(hudiOpts, deletedDf)
+    }
+  }
+
   @ParameterizedTest
   @EnumSource(classOf[HoodieTableType])
   def testRLIUpsertAndDropIndex(tableType: HoodieTableType): Unit = {
@@ -503,15 +522,18 @@ class TestRecordLevelIndex extends 
HoodieSparkClientTestBase {
       latestBatch = recordsToStrings(dataGen.generateInserts(getInstantTime(), 
5)).asScala
     }
     val latestBatchDf = 
spark.read.json(spark.sparkContext.parallelize(latestBatch, 2))
+    latestBatchDf.cache()
     latestBatchDf.write.format("org.apache.hudi")
       .options(hudiOpts)
       .option(DataSourceWriteOptions.OPERATION.key, operation)
       .mode(saveMode)
       .save(basePath)
     val deletedDf = calculateMergedDf(latestBatchDf, operation)
+    deletedDf.cache()
     if (validate) {
       validateDataAndRecordIndices(hudiOpts, deletedDf)
     }
+    deletedDf.unpersist()
     latestBatchDf
   }
 

Reply via email to