This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b95248e0119 [HUDI-6151] Rollback previously applied commits to MDT 
when operations are retried (#8604)
b95248e0119 is described below

commit b95248e011931f4748a7a9fbb8298cbbb71bda88
Author: Prashant Wason <pwa...@uber.com>
AuthorDate: Thu Jun 29 01:59:08 2023 -0700

    [HUDI-6151] Rollback previously applied commits to MDT when operations are 
retried (#8604)
    
    Operations like Clean, Compaction are retried after failures with the same 
instant time. If the previous run of the operation successfully committed to 
the MDT but failed to commit to the dataset, then the operation will be retried 
later with the same instantTime causing duplicate updates applied to MDT.
    
    Currently, we simply delete the completed deltacommit without rolling back 
the deltacommit.
    
    To handle this, we detect a replay of operation and rollback any changes 
from that operation in MDT.
    
    ---------
    
    Co-authored-by: Sagar Sumit <sagarsumi...@gmail.com>
---
 .../FlinkHoodieBackedTableMetadataWriter.java      | 50 ++++++++--------
 .../SparkHoodieBackedTableMetadataWriter.java      | 38 ++++++------
 .../functional/TestHoodieBackedMetadata.java       | 68 +++++++++++++++++++++-
 3 files changed, 113 insertions(+), 43 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 7dd32e2916e..6edeac05a74 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -32,9 +32,13 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -46,7 +50,7 @@ import static 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGE
  * Flink hoodie backed table metadata writer.
  */
 public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetadataWriter {
-
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkHoodieBackedTableMetadataWriter.class);
   private transient BaseHoodieWriteClient writeClient;
 
   public static HoodieTableMetadataWriter create(Configuration conf, 
HoodieWriteConfig writeConfig,
@@ -118,33 +122,31 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
 
       if 
(!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
         // if this is a new commit being applied to metadata for the first time
-        writeClient.startCommitWithTime(instantTime);
-        
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION,
 instantTime);
+        LOG.info("New commit at " + instantTime + " being applied to MDT.");
       } else {
-        Option<HoodieInstant> alreadyCompletedInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry 
-> entry.getTimestamp().equals(instantTime)).lastInstant();
-        if (alreadyCompletedInstant.isPresent()) {
-          // this code path refers to a re-attempted commit that got committed 
to metadata table, but failed in datatable.
-          // for eg, lets say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
-          // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
-          // are upserts to metadata table and so only a new delta commit will 
be created.
-          // once rollback is complete, compaction will be retried again, 
which will eventually hit this code block where the respective commit is
-          // already part of completed commit. So, we have to manually remove 
the completed instant and proceed.
-          // and it is for the same reason we enabled 
withAllowMultiWriteOnSameInstant for metadata table.
-          HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), 
metadataMetaClient.getMetaPath(), alreadyCompletedInstant.get());
-          metadataMetaClient.reloadActiveTimeline();
+        // this code path refers to a re-attempted commit that:
+        //   1. got committed to metadata table, but failed in datatable.
+        //   2. failed while committing to metadata table
+        // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will 
be created.
+        // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
+        Option<HoodieInstant> alreadyCompletedInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry 
-> entry.getTimestamp().equals(instantTime))
+            .lastInstant();
+        LOG.info(String.format("%s completed commit at %s being applied to 
MDT.",
+            alreadyCompletedInstant.isPresent() ? "Already" : "Partially", 
instantTime));
+
+        // Rollback the previous commit
+        if (!writeClient.rollback(instantTime)) {
+          throw new HoodieMetadataException("Failed to rollback deltacommit at 
" + instantTime + " from MDT");
         }
-        // If the alreadyCompletedInstant is empty, that means there is a 
requested or inflight
-        // instant with the same instant time.  This happens for data table 
clean action which
-        // reuses the same instant time without rollback first.  It is a no-op 
here as the
-        // clean plan is the same, so we don't need to delete the requested 
and inflight instant
-        // files in the active timeline.
-
-        // The metadata writer uses LAZY cleaning strategy without auto commit,
-        // write client then checks the heartbeat expiration when committing 
the instant,
-        // sets up the heartbeat explicitly to make the check pass.
-        writeClient.getHeartbeatClient().start(instantTime);
+        metadataMetaClient.reloadActiveTimeline();
       }
 
+      writeClient.startCommitWithTime(instantTime);
+      
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION,
 instantTime);
+
       List<WriteStatus> statuses = isInitializing
           ? writeClient.bulkInsertPreppedRecords(preppedRecordList, 
instantTime, Option.empty())
           : writeClient.upsertPreppedRecords(preppedRecordList, instantTime);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index eab0c436248..9a254409b8d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -29,13 +29,13 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.metrics.DistributedRegistry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.table.BulkInsertPartitioner;
@@ -144,27 +144,29 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
 
       if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
         // if this is a new commit being applied to metadata for the first time
-        writeClient.startCommitWithTime(instantTime);
+        LOG.info("New commit at " + instantTime + " being applied to MDT.");
       } else {
-        Option<HoodieInstant> alreadyCompletedInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry 
-> entry.getTimestamp().equals(instantTime)).lastInstant();
-        if (alreadyCompletedInstant.isPresent()) {
-          // this code path refers to a re-attempted commit that got committed 
to metadata table, but failed in datatable.
-          // for eg, lets say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
-          // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
-          // are upserts to metadata table and so only a new delta commit will 
be created.
-          // once rollback is complete, compaction will be retried again, 
which will eventually hit this code block where the respective commit is
-          // already part of completed commit. So, we have to manually remove 
the completed instant and proceed.
-          // and it is for the same reason we enabled 
withAllowMultiWriteOnSameInstant for metadata table.
-          HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), 
metadataMetaClient.getMetaPath(), alreadyCompletedInstant.get());
-          metadataMetaClient.reloadActiveTimeline();
+        // this code path refers to a re-attempted commit that:
+        //   1. got committed to metadata table, but failed in datatable.
+        //   2. failed while committing to metadata table
+        // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will 
be created.
+        // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
+        Option<HoodieInstant> alreadyCompletedInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry 
-> entry.getTimestamp().equals(instantTime))
+            .lastInstant();
+        LOG.info(String.format("%s completed commit at %s being applied to 
MDT.",
+            alreadyCompletedInstant.isPresent() ? "Already" : "Partially", 
instantTime));
+
+        // Rollback the previous commit
+        if (!writeClient.rollback(instantTime)) {
+          throw new HoodieMetadataException("Failed to rollback deltacommit at 
" + instantTime + " from MDT");
         }
-        // If the alreadyCompletedInstant is empty, that means there is a 
requested or inflight
-        // instant with the same instant time.  This happens for data table 
clean action which
-        // reuses the same instant time without rollback first.  It is a no-op 
here as the
-        // clean plan is the same, so we don't need to delete the requested 
and inflight instant
-        // files in the active timeline.
+        metadataMetaClient.reloadActiveTimeline();
       }
 
+      writeClient.startCommitWithTime(instantTime);
       if (bulkInsertPartitioner.isPresent()) {
         writeClient.bulkInsertPreppedRecords(preppedRecordRDD, instantTime, 
bulkInsertPartitioner).collect();
       } else {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index b50c002468f..84417fce958 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client.functional;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.client.SparkRDDWriteClient;
@@ -2029,7 +2030,9 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
     HoodieWriteConfig newWriteConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, 
HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
         .withAutoCommit(false)
-        .withClusteringConfig(clusteringConfig).build();
+        .withClusteringConfig(clusteringConfig)
+        .withRollbackUsingMarkers(false)
+        .build();
 
     // trigger clustering
     SparkRDDWriteClient newClient = getHoodieWriteClient(newWriteConfig);
@@ -2821,6 +2824,69 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
   }
 
+  /**
+   * Test duplicate operation with same instant timestamp.
+   *
+   * This can happen if the commit on the MDT succeeds but fails on the 
dataset. For some table services like clean,
+   * compaction, replace commit, the operation will be retried with the same 
timestamp (taken from inflight). Hence,
+   * metadata table will see an additional commit with the same timestamp as a 
previously completed deltacommit.
+   */
+  @Test
+  public void testRepeatedActionWithSameInstantTime() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    Properties props = new Properties();
+    props.put(HoodieCleanConfig.ALLOW_MULTIPLE_CLEANS.key(), "false");
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, 
false).withProps(props).build();
+
+    // Perform three writes so we can initiate a clean
+    int index = 0;
+    final String partition = "2015/03/16";
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      for (; index < 3; ++index) {
+        String newCommitTime = "00" + index;
+        List<HoodieRecord> records = index == 0 ? 
dataGen.generateInsertsForPartition(newCommitTime, 10, partition)
+            : dataGen.generateUniqueUpdates(newCommitTime, 5);
+        client.startCommitWithTime(newCommitTime);
+        client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      }
+    }
+    
assertEquals(metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants(),
 3);
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      // Perform a clean
+      String cleanInstantTime = "00" + index++;
+      HoodieCleanMetadata cleanMetadata = client.clean(cleanInstantTime);
+      // 1 partition should be cleaned
+      assertEquals(cleanMetadata.getPartitionMetadata().size(), 1);
+      // 1 file cleaned
+      
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(),
 1);
+      
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(),
 0);
+      
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(),
 1);
+
+      // To simulate failed clean on the main dataset, we will delete the 
completed clean instant
+      String cleanInstantFileName = 
HoodieTimeline.makeCleanerFileName(cleanInstantTime);
+      assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + 
HoodieTableMetaClient.METAFOLDER_NAME,
+          cleanInstantFileName), false));
+      
assertEquals(metaClient.reloadActiveTimeline().getCleanerTimeline().filterInflights().countInstants(),
 1);
+      
assertEquals(metaClient.reloadActiveTimeline().getCleanerTimeline().filterCompletedInstants().countInstants(),
 0);
+
+      // Initiate another clean. The previous leftover clean will be attempted 
and no other clean will be scheduled.
+      String newCleanInstantTime = "00" + index++;
+      cleanMetadata = client.clean(newCleanInstantTime);
+
+      // 1 partition should be cleaned
+      assertEquals(cleanMetadata.getPartitionMetadata().size(), 1);
+      // 1 file cleaned but was already deleted so will be a failed delete
+      
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(),
 0);
+      
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(),
 1);
+      
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(),
 1);
+
+      validateMetadata(client);
+    }
+  }
+
   private void doPreBootstrapOperations(HoodieTestTable testTable) throws 
Exception {
     doPreBootstrapOperations(testTable, "0000001", "0000002");
   }

Reply via email to