This is an automated email from the ASF dual-hosted git repository.

satish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f2159c4  [HUDI-1660] Excluding compaction and clustering instants from 
inflight rollback (#2631)
f2159c4 is described below

commit f2159c4573810f922fadff640a953175a852dc43
Author: n3nash <nagar...@uber.com>
AuthorDate: Fri Mar 5 11:18:09 2021 -0800

    [HUDI-1660] Excluding compaction and clustering instants from inflight 
rollback (#2631)
---
 .../hudi/client/AbstractHoodieWriteClient.java     | 27 +++++++++++-----------
 1 file changed, 13 insertions(+), 14 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 6ce0564..a3ba008 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client;
 
 import com.codahale.metrics.Timer;
+import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
@@ -32,7 +33,6 @@ import 
org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.heartbeat.HeartbeatUtils;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -765,21 +765,20 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
   }
 
   private List<String> getInstantsToRollback(HoodieTable<T, I, K, O> table) {
+    Stream<HoodieInstant> inflightInstantsStream = 
getInflightTimelineExcludeCompactionAndClustering(table)
+        .getReverseOrderedInstants();
     if (config.getFailedWritesCleanPolicy().isEager()) {
-      HoodieTimeline inflightTimeline = 
table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
-      return 
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
-          .collect(Collectors.toList());
-    } else if (config.getFailedWritesCleanPolicy() == 
HoodieFailedWritesCleaningPolicy.NEVER) {
-      return Collections.EMPTY_LIST;
+      return 
inflightInstantsStream.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
     } else if (config.getFailedWritesCleanPolicy().isLazy()) {
-      return table.getMetaClient().getActiveTimeline()
-          
.getCommitsTimeline().filterInflights().getReverseOrderedInstants().filter(instant
 -> {
-            try {
-              return 
heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
-            } catch (IOException io) {
-              throw new HoodieException("Failed to check heartbeat for instant 
" + instant, io);
-            }
-          }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      return inflightInstantsStream.filter(instant -> {
+        try {
+          return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
+        } catch (IOException io) {
+          throw new HoodieException("Failed to check heartbeat for instant " + 
instant, io);
+        }
+      }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+    } else if (config.getFailedWritesCleanPolicy().isNever()) {
+      return Collections.EMPTY_LIST;
     } else {
       throw new IllegalArgumentException("Invalid Failed Writes Cleaning 
Policy " + config.getFailedWritesCleanPolicy());
     }

Reply via email to