This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fbf3128196 [HUDI-8040] Fix 
SimpleConcurrentFileWritesConflictResolutionStrategy get clustering instant 
contains insert overwrite (#11691)
0fbf3128196 is described below

commit 0fbf31281964a74e1d1595bc830cf04e8196063e
Author: xiaodong <dongtingting8...@163.com>
AuthorDate: Sat Aug 3 09:24:11 2024 +0800

    [HUDI-8040] Fix SimpleConcurrentFileWritesConflictResolutionStrategy get 
clustering instant contains insert overwrite (#11691)
---
 ...urrentFileWritesConflictResolutionStrategy.java |  3 +++
 .../TestConflictResolutionStrategyUtil.java        | 13 +++++++++++
 ...urrentFileWritesConflictResolutionStrategy.java | 26 ++++++++++++++++++++++
 3 files changed, 42 insertions(+)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
index c162aee380a..3c2c0d21e3e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieWriteConflictException;
@@ -62,6 +63,8 @@ public class 
SimpleConcurrentFileWritesConflictResolutionStrategy
 
     Stream<HoodieInstant> compactionAndClusteringPendingTimeline = 
activeTimeline
         .filterPendingReplaceClusteringAndCompactionTimeline()
+        .filter(instant -> ClusteringUtils.isClusteringInstant(activeTimeline, 
instant)
+            || HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction()))
         .findInstantsAfter(currentInstant.getTimestamp())
         .getInstantsAsStream();
     return Stream.concat(completedCommitsInstantStream, 
compactionAndClusteringPendingTimeline);
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
index d6988cae0fd..1ebe3f05d1e 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
@@ -144,6 +144,19 @@ public class TestConflictResolutionStrategyUtil {
         
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
   }
 
+  public static void createPendingInsertOverwrite(String instantTime, 
WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) throws 
Exception {
+    //insert_overwrite
+    String fileId1 = "file-1";
+    String fileId2 = "file-2";
+
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata = new 
HoodieRequestedReplaceMetadata();
+    HoodieReplaceCommitMetadata replaceMetadata = new 
HoodieReplaceCommitMetadata();
+
+    HoodieTestTable.of(metaClient)
+        .addPendingReplace(instantTime, Option.of(requestedReplaceMetadata), 
Option.empty())
+        
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
+  }
+
   public static void createReplace(String instantTime, WriteOperationType 
writeOperationType, HoodieTableMetaClient metaClient) throws Exception {
     String fileId1 = "file-1";
     String fileId2 = "file-2";
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
index d2ee5d67285..a2ace63bdb6 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -51,6 +51,7 @@ import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyU
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createInflightCommit;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingCompaction;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingCluster;
+import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingInsertOverwrite;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplace;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createRequestedCommit;
 
@@ -346,6 +347,31 @@ public class 
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
     }
   }
 
+  @Test
+  public void tstConcurrentWritesWithPendingInsertOverwriteReplace() throws 
Exception {
+    createCommit(metaClient.createNewInstantTime(), metaClient);
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    // consider commits before this are all successful
+    Option<HoodieInstant> lastSuccessfulInstant = 
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+    // writer 1 starts
+    String currentWriterInstant = metaClient.createNewInstantTime();
+    createInflightCommit(currentWriterInstant, metaClient);
+
+    // insert_overwrite 1 gets scheduled and inflighted
+    String newInstantTime = metaClient.createNewInstantTime();
+    createPendingInsertOverwrite(newInstantTime, 
WriteOperationType.INSERT_OVERWRITE, metaClient);
+
+    Option<HoodieInstant> currentInstant = Option.of(new 
HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, 
currentWriterInstant));
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new 
SimpleConcurrentFileWritesConflictResolutionStrategy();
+    HoodieCommitMetadata currentMetadata = 
createCommitMetadata(currentWriterInstant);
+    metaClient.reloadActiveTimeline();
+    List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
+        Collectors.toList());
+    // writer 1 will not conflicts with insert_overwrite 1
+    Assertions.assertTrue(candidateInstants.size() == 0);
+  }
+
   // try to simulate HUDI-3355
   @Test
   public void testConcurrentWritesWithPendingInstants() throws Exception {

Reply via email to