This is an automated email from the ASF dual-hosted git repository. satish pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new f2159c4 [HUDI-1660] Excluding compaction and clustering instants from inflight rollback (#2631) f2159c4 is described below commit f2159c4573810f922fadff640a953175a852dc43 Author: n3nash <nagar...@uber.com> AuthorDate: Fri Mar 5 11:18:09 2021 -0800 [HUDI-1660] Excluding compaction and clustering instants from inflight rollback (#2631) --- .../hudi/client/AbstractHoodieWriteClient.java | 27 +++++++++++----------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 6ce0564..a3ba008 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -19,6 +19,7 @@ package org.apache.hudi.client; import com.codahale.metrics.Timer; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -32,7 +33,6 @@ import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.heartbeat.HeartbeatUtils; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; @@ -765,21 +765,20 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I } private List<String> getInstantsToRollback(HoodieTable<T, I, K, O> table) { + Stream<HoodieInstant> inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(table) + .getReverseOrderedInstants(); if (config.getFailedWritesCleanPolicy().isEager()) { - HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); - return inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()); - } else if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.NEVER) { - return Collections.EMPTY_LIST; + return inflightInstantsStream.map(HoodieInstant::getTimestamp).collect(Collectors.toList()); } else if (config.getFailedWritesCleanPolicy().isLazy()) { - return table.getMetaClient().getActiveTimeline() - .getCommitsTimeline().filterInflights().getReverseOrderedInstants().filter(instant -> { - try { - return heartbeatClient.isHeartbeatExpired(instant.getTimestamp()); - } catch (IOException io) { - throw new HoodieException("Failed to check heartbeat for instant " + instant, io); - } - }).map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + return inflightInstantsStream.filter(instant -> { + try { + return heartbeatClient.isHeartbeatExpired(instant.getTimestamp()); + } catch (IOException io) { + throw new HoodieException("Failed to check heartbeat for instant " + instant, io); + } + }).map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + } else if (config.getFailedWritesCleanPolicy().isNever()) { + return Collections.EMPTY_LIST; } else { throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy()); }