This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 0fbf3128196 [HUDI-8040] Fix SimpleConcurrentFileWritesConflictResolutionStrategy get clustering instant contains insert overwrite (#11691) 0fbf3128196 is described below commit 0fbf31281964a74e1d1595bc830cf04e8196063e Author: xiaodong <dongtingting8...@163.com> AuthorDate: Sat Aug 3 09:24:11 2024 +0800 [HUDI-8040] Fix SimpleConcurrentFileWritesConflictResolutionStrategy get clustering instant contains insert overwrite (#11691) --- ...urrentFileWritesConflictResolutionStrategy.java | 3 +++ .../TestConflictResolutionStrategyUtil.java | 13 +++++++++++ ...urrentFileWritesConflictResolutionStrategy.java | 26 ++++++++++++++++++++++ 3 files changed, 42 insertions(+) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java index c162aee380a..3c2c0d21e3e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieWriteConflictException; @@ -62,6 +63,8 @@ public class SimpleConcurrentFileWritesConflictResolutionStrategy Stream<HoodieInstant> compactionAndClusteringPendingTimeline = activeTimeline .filterPendingReplaceClusteringAndCompactionTimeline() + .filter(instant -> ClusteringUtils.isClusteringInstant(activeTimeline, instant) + || HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) .findInstantsAfter(currentInstant.getTimestamp()) .getInstantsAsStream(); return Stream.concat(completedCommitsInstantStream, compactionAndClusteringPendingTimeline); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java index d6988cae0fd..1ebe3f05d1e 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java @@ -144,6 +144,19 @@ public class TestConflictResolutionStrategyUtil { .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } + public static void createPendingInsertOverwrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) throws Exception { + //insert_overwrite + String fileId1 = "file-1"; + String fileId2 = "file-2"; + + HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); + HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); + + HoodieTestTable.of(metaClient) + .addPendingReplace(instantTime, Option.of(requestedReplaceMetadata), Option.empty()) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + public static void createReplace(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) throws Exception { String fileId1 = "file-1"; String fileId2 = "file-2"; diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java index d2ee5d67285..a2ace63bdb6 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java @@ -51,6 +51,7 @@ import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyU import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createInflightCommit; import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingCompaction; import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingCluster; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingInsertOverwrite; import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplace; import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createRequestedCommit; @@ -346,6 +347,31 @@ public class TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho } } + @Test + public void tstConcurrentWritesWithPendingInsertOverwriteReplace() throws Exception { + createCommit(metaClient.createNewInstantTime(), metaClient); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + // consider commits before this are all successful + Option<HoodieInstant> lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); + + // writer 1 starts + String currentWriterInstant = metaClient.createNewInstantTime(); + createInflightCommit(currentWriterInstant, metaClient); + + // insert_overwrite 1 gets scheduled and inflighted + String newInstantTime = metaClient.createNewInstantTime(); + createPendingInsertOverwrite(newInstantTime, WriteOperationType.INSERT_OVERWRITE, metaClient); + + Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); + SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy(); + HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant); + metaClient.reloadActiveTimeline(); + List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( + Collectors.toList()); + // writer 1 will not conflicts with insert_overwrite 1 + Assertions.assertTrue(candidateInstants.size() == 0); + } + // try to simulate HUDI-3355 @Test public void testConcurrentWritesWithPendingInstants() throws Exception {