This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6826101d6b7 [MINOR] Remove extra timeline reload during conflict
resolution (#12272)
6826101d6b7 is described below
commit 6826101d6b737d04b9510a2f4c7aa79e46c13d5f
Author: Tim Brown <[email protected]>
AuthorDate: Mon Dec 2 20:22:59 2024 -0600
[MINOR] Remove extra timeline reload during conflict resolution (#12272)
---
.../java/org/apache/hudi/client/BaseHoodieClient.java | 2 +-
.../org/apache/hudi/client/utils/TransactionUtils.java | 17 +++++++++++------
.../table/action/commit/BaseCommitActionExecutor.java | 4 ++--
...eConcurrentFileWritesConflictResolutionStrategy.java | 4 ++--
4 files changed, 16 insertions(+), 11 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index c054a817b52..26a5b191377 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -227,7 +227,7 @@ public abstract class BaseHoodieClient implements
Serializable, AutoCloseable {
Timer.Context conflictResolutionTimer = metrics.getConflictResolutionCtx();
try {
TransactionUtils.resolveWriteConflictIfAny(table,
this.txnManager.getCurrentTransactionOwner(),
- Option.of(metadata), config,
txnManager.getLastCompletedTransactionOwner(), false,
pendingInflightAndRequestedInstants);
+ Option.of(metadata), config,
txnManager.getLastCompletedTransactionOwner(), true,
pendingInflightAndRequestedInstants);
metrics.emitConflictResolutionSuccessful();
} catch (HoodieWriteConflictException e) {
metrics.emitConflictResolutionFailed();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
index 6e7bf12867d..ae2e9c20804 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
@@ -66,17 +66,17 @@ public class TransactionUtils {
final Option<HoodieCommitMetadata> thisCommitMetadata,
final HoodieWriteConfig config,
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
- boolean reloadActiveTimeline,
+ boolean timelineRefreshedWithinTransaction,
Set<String> pendingInstants) throws HoodieWriteConflictException {
WriteOperationType operationType =
thisCommitMetadata.map(HoodieCommitMetadata::getOperationType).orElse(null);
if (config.needResolveWriteConflict(operationType)) {
// deal with pendingInstants
- Stream<HoodieInstant> completedInstantsDuringCurrentWriteOperation =
getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(),
pendingInstants);
-
- ConflictResolutionStrategy resolutionStrategy =
config.getWriteConflictResolutionStrategy();
- if (reloadActiveTimeline) {
+ if (!timelineRefreshedWithinTransaction) {
table.getMetaClient().reloadActiveTimeline();
}
+ Stream<HoodieInstant> completedInstantsDuringCurrentWriteOperation =
getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(),
pendingInstants);
+ ConflictResolutionStrategy resolutionStrategy =
config.getWriteConflictResolutionStrategy();
+
Stream<HoodieInstant> instantStream =
Stream.concat(resolutionStrategy.getCandidateInstants(
table.getMetaClient(), currentTxnOwnerInstant.get(),
lastCompletedTxnOwnerInstant),
completedInstantsDuringCurrentWriteOperation);
@@ -146,12 +146,17 @@ public class TransactionUtils {
.collect(Collectors.toSet());
}
+ /**
+ * Helper to find the instants that completed during this operation.
+ * @param metaClient client that was created or refreshed within the
transaction
+ * @param pendingInstants pending instants to compare
+ * @return instants that completed during this operation
+ */
public static Stream<HoodieInstant>
getCompletedInstantsDuringCurrentWriteOperation(HoodieTableMetaClient
metaClient, Set<String> pendingInstants) {
// deal with pendingInstants
// some pending instants maybe finished during current write operation,
// we should check the conflict of those pending operation
return metaClient
- .reloadActiveTimeline()
.getCommitsTimeline()
.filterCompletedInstants()
.getInstantsAsStream()
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 055fd7e10ba..841a3553dd0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -198,9 +198,9 @@ public abstract class BaseCommitActionExecutor<T, I, K, O,
R>
lastCompletedTxn.isPresent() ?
Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
try {
setCommitMetadata(result);
- // reload active timeline so as to get all updates after current
transaction have started. hence setting last arg to true.
+ // table instance is created outside the transaction boundary so setting
`timelineRefreshedWithinTransaction` to false below
TransactionUtils.resolveWriteConflictIfAny(table,
txnManager.getCurrentTransactionOwner(),
- result.getCommitMetadata(), config,
txnManager.getLastCompletedTransactionOwner(), true,
pendingInflightAndRequestedInstants);
+ result.getCommitMetadata(), config,
txnManager.getLastCompletedTransactionOwner(), false,
pendingInflightAndRequestedInstants);
commit(result);
} finally {
txnManager.endTransaction(inflightInstant);
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
index ee08b94da95..9488fd8d797 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -411,11 +411,11 @@ public class
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new
SimpleConcurrentFileWritesConflictResolutionStrategy();
// make sure c3 has conflict with C1,C11,C12,C4;
HoodieCommitMetadata currentMetadata =
createCommitMetadata(currentWriterInstant, "file-2");
- timeline.reload();
+ metaClient.reloadActiveTimeline();
List<HoodieInstant> completedInstantsDuringCurrentWriteOperation =
TransactionUtils
.getCompletedInstantsDuringCurrentWriteOperation(metaClient,
pendingInstant).collect(Collectors.toList());
// C1,C11,C12,C4 should be included
- Assertions.assertTrue(completedInstantsDuringCurrentWriteOperation.size()
== 4);
+ Assertions.assertEquals(4,
completedInstantsDuringCurrentWriteOperation.size());
ConcurrentOperation thisCommitOperation = new
ConcurrentOperation(currentInstant.get(), currentMetadata);
// check C3 has conflict with C1,C11,C12,C4