This is an automated email from the ASF dual-hosted git repository.
ascherbakoff pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 83b124b70ea IGNITE-25825 Async write intent cleanup (#7999)
83b124b70ea is described below
commit 83b124b70eaabca69f5a477c2294bfe0201aa4d3
Author: Aleksei Scherbakov <[email protected]>
AuthorDate: Thu Apr 23 13:43:40 2026 +0300
IGNITE-25825 Async write intent cleanup (#7999)
---
.../client/ItThinClientTransactionsTest.java | 6 +-
.../internal/client/tx/ClientTransaction.java | 51 +++++--
.../handlers/TxFinishReplicaRequestHandler.java | 16 ++-
.../ItPrimaryReplicaChoiceTest.java | 3 +-
.../internal/sql/engine/ItOrToUnionRuleTest.java | 3 +
.../ItTxAbstractDistributedTestSingleNode.java | 4 +-
...xDistributedTestSingleNodeNoCleanupMessage.java | 1 +
.../TablePartitionResourcesFactory.java | 1 +
.../DefaultTablePartitionReplicaProcessor.java | 148 ++++++++++++++++++++-
...ePartitionReplicaProcessorIndexLockingTest.java | 1 +
...tionReplicaProcessorSortedIndexLockingTest.java | 1 +
.../DefaultTablePartitionReplicaProcessorTest.java | 1 +
.../ZonePartitionReplicaListenerTest.java | 1 +
.../storage/InternalTableEstimatedSizeTest.java | 3 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 1 +
.../ignite/internal/table/TxAbstractTest.java | 8 +-
.../table/impl/DummyInternalTableImpl.java | 1 +
.../internal/tx/impl/ReadWriteTransactionImpl.java | 12 +-
18 files changed, 223 insertions(+), 39 deletions(-)
diff --git
a/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
b/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
index 7d87e7da0ad..b0d6c8b0e79 100644
---
a/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
+++
b/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
@@ -481,7 +481,7 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
Tuple key2 = tuples.get(1);
assertThat(ctx.put.apply(client, tx, key2),
willThrowWithCauseOrSuppressed(TransactionException.class, "Transaction is
killed"));
- assertThat(tx.commitAsync(), willSucceedFast());
+ assertThat(tx.commitAsync(),
willThrowWithCauseOrSuppressed(TransactionException.class));
// Validate lock possibility.
assertThat(kvView.removeAllAsync(null, Arrays.asList(key, key2)),
willSucceedFast());
@@ -594,7 +594,7 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
await().atMost(3, TimeUnit.SECONDS).until(() ->
tx.startedTx().killed());
- assertThat(tx.commitAsync(), willSucceedFast());
+ assertThat(tx.commitAsync(),
willThrowWithCauseOrSuppressed(TransactionException.class));
// Validate lock possibility.
assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key1,
key2)), willSucceedFast());
@@ -1648,6 +1648,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
assertThat(ctx.put.apply(client(), youngerTxProxy, key),
willThrowWithCauseOrSuppressed(ctx.expectedErr));
} else {
assertThat(ctx.put.apply(client(), olderTxProxy, key2),
willSucceedFast()); // Will invalidate younger tx.
+
+ await().atMost(2, TimeUnit.SECONDS).until(() ->
youngerTxProxy.startedTx().killed());
assertThat(youngerTxProxy.commitAsync(),
willThrowWithCauseOrSuppressed(TransactionException.class));
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index b368b049fe2..78b30f3abaa 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -89,10 +89,20 @@ public class ClientTransaction implements Transaction {
/** Node-local resource id for the Transaction. */
private final long id;
- /** The future used on repeated commit/rollback. */
+ /** The future is used on repeated direct (via API) commit/rollback. */
@IgniteToStringExclude
private volatile CompletableFuture<Void> finishFut;
+ /**
+ * The future is used to track outcome of the implicit rollback operation
(for example, a transaction is killed).
+ * There is a scenario, when both futures are required.
+ * 1. implicit rollback is in progress
+ * 2. commit is called first time - an implicit rollback future outcome
should be reported
+ * 3. commit is called second time - a completed future should be
reported (this is a contract both for subsequent commit/rollback)
+ */
+ @IgniteToStringExclude
+ private volatile CompletableFuture<Void> implicitRollbackFut;
+
/** State. */
private final AtomicInteger state = new AtomicInteger(STATE_OPEN);
@@ -246,9 +256,13 @@ public class ClientTransaction implements Transaction {
try {
if (finishFut != null) {
return finishFut;
+ } else if (implicitRollbackFut == null) {
+ implicitRollbackFut = new CompletableFuture<>();
} else {
- finishFut = new CompletableFuture<>();
+ return implicitRollbackFut;
}
+
+ setState(killed ? STATE_KILLED : STATE_ROLLED_BACK);
} finally {
enlistPartitionLock.writeLock().unlock();
}
@@ -264,17 +278,19 @@ public class ClientTransaction implements Transaction {
// It's safe to rollback proxy and direct parts of transactions
concurrently.
// Write intent resolution will ignore WIs from PENDING transactions.
- return CompletableFuture.allOf(rollbackFut,
sendDiscardRequests()).handle((r, e) -> {
- setState(killed ? STATE_KILLED : STATE_ROLLED_BACK);
+ return CompletableFuture.allOf(rollbackFut,
sendDiscardRequests()).whenComplete((r, e) -> {
ch.inflights().erase(txId());
- this.finishFut.complete(null);
- return null;
+ TransactionException ex = exceptionForState(state.get(), this);
+
+ if (e != null) {
+ ex.addSuppressed(e);
+ }
+
+ this.implicitRollbackFut.completeExceptionally(ex);
});
}
private CompletableFuture<Void> sendDiscardRequests() {
- assert finishFut != null;
-
if
(!ch.protocolContext().isFeatureSupported(TX_DIRECT_MAPPING_SEND_DISCARD)) {
return nullCompletedFuture();
}
@@ -326,6 +342,9 @@ public class ClientTransaction implements Transaction {
try {
if (finishFut != null) {
return finishFut;
+ } else if (implicitRollbackFut != null) {
+ finishFut = nullCompletedFuture();
+ return implicitRollbackFut;
} else {
finishFut = new CompletableFuture<>();
}
@@ -363,14 +382,14 @@ public class ClientTransaction implements Transaction {
// Failed to commit for some reason, need to discard direct
mappings.
Throwable cause = ExceptionUtils.unwrapCause(e);
- sendDiscardRequests().handle((r, e0) -> {
- setState(cause instanceof ClientTransactionKilledException
? STATE_KILLED : STATE_ROLLED_BACK);
+ setState(cause instanceof ClientTransactionKilledException ?
STATE_KILLED : STATE_ROLLED_BACK);
+
+ sendDiscardRequests().whenComplete((r, e0) -> {
ch.inflights().erase(txId());
this.finishFut.complete(null);
- return null;
});
- return null;
+ throw sneakyThrow(e); // Fail commit future.
}
setState(STATE_COMMITTED);
@@ -420,9 +439,14 @@ public class ClientTransaction implements Transaction {
try {
if (finishFut != null) {
return finishFut;
+ } else if (implicitRollbackFut != null) {
+ finishFut = nullCompletedFuture();
+ return implicitRollbackFut;
} else {
finishFut = new CompletableFuture<>();
}
+
+ setState(STATE_ROLLED_BACK);
} finally {
enlistPartitionLock.writeLock().unlock();
}
@@ -437,7 +461,6 @@ public class ClientTransaction implements Transaction {
}, r -> null);
mainFinishFut.handle((res, e) -> {
- setState(STATE_ROLLED_BACK);
ch.inflights().erase(txId());
this.finishFut.complete(null);
return null;
@@ -596,7 +619,7 @@ public class ClientTransaction implements Transaction {
}
private void checkEnlistPossible() {
- if (finishFut != null) {
+ if (finishFut != null || implicitRollbackFut != null) {
throw exceptionForState(state.get(), this);
}
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
index 0366339f2d9..2167cf8f6ff 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
@@ -224,12 +224,22 @@ public class TxFinishReplicaRequestHandler {
.map(entry -> new EnlistedPartitionGroup(entry.getKey(),
entry.getValue().tableIds()))
.collect(toList());
return finishTransaction(enlistedPartitionGroups, txId, commit,
commitTimestamp)
- .thenCompose(txResult -> {
+ .thenApply(txResult -> {
boolean actualCommit = txResult.transactionState() ==
COMMITTED;
HybridTimestamp actualCommitTs =
txResult.commitTimestamp();
- return txManager.cleanup(replicationGroupId,
enlistedPartitions, actualCommit, actualCommitTs, txId)
- .thenApply(v -> txResult);
+ try {
+ txManager.cleanup(replicationGroupId,
enlistedPartitions, actualCommit, actualCommitTs, txId)
+ .whenComplete((r, e) -> {
+ if (e != null) {
+ LOG.warn("Failed to cleanup a
transaction [id={}]", txId, e);
+ }
+ });
+ } catch (Exception e) {
+ LOG.warn("Failed to cleanup a transaction [id={}]",
txId, e);
+ }
+
+ return txResult;
});
}
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
index 0dd1905754a..b6621253226 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
@@ -72,6 +72,7 @@ import
org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.flow.TestFlowUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
+import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.wrapper.Wrappers;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
@@ -281,7 +282,7 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
rwTx.rollback();
-
assertFalse(primaryIgnite.txManager().lockManager().locks(rwTx.id()).hasNext());
+ assertTrue(waitForCondition(() ->
CollectionUtils.nullOrEmpty(primaryIgnite.txManager().lockManager().locks(rwTx.id())),
2000));
assertEquals(3, partitionStorage.pendingCursors() +
hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
}
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
index f59d95f8215..867047cc7a5 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
@@ -86,6 +86,9 @@ public class ItOrToUnionRuleTest extends
BaseSqlIntegrationTest {
{23, null, 0, null, 41, null},
});
+ // TODO IGNITE-28578
+ assertQuery("select count(*) from products").returns(23L).check();
+
gatherStatistics();
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
index a5de447c11e..2048fcb073a 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_EXCEPTION_ERR;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.is;
@@ -31,7 +32,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -150,7 +150,7 @@ public abstract class ItTxAbstractDistributedTestSingleNode
extends TxAbstractTe
assertThat(futFinishes, willSucceedFast());
assertThat(futEnlists, willSucceedFast());
-
assertTrue(CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId)));
+ await().atMost(2, TimeUnit.SECONDS).until(() ->
CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId)));
}
/**
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index a68006cc8e0..c5bd9bf2765 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -206,6 +206,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
txManager,
txManager.lockManager(),
Runnable::run,
+ Runnable::run,
replicationGroupId,
tableId,
indexesLockers,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
index a541025d562..68c00522e1d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
@@ -285,6 +285,7 @@ class TablePartitionResourcesFactory {
txManager,
lockManager,
scanRequestExecutor,
+ partitionOperationsExecutor,
replicationGroupId,
table.tableId(),
table.indexesLockers(partitionIndex),
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java
index 96ede9bae0f..127aca54b76 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java
@@ -52,6 +52,7 @@ import static
org.apache.ignite.internal.tx.impl.TxStateResolutionParameters.txS
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.internal.util.CollectionUtils.view;
import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
+import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyCollectionCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
@@ -83,10 +84,15 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.ignite.internal.binarytuple.BinaryTuple;
@@ -199,8 +205,10 @@ import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
import
org.apache.ignite.internal.tx.OutdatedReadOnlyTransactionInternalException;
import
org.apache.ignite.internal.tx.PrimaryReplicaChangeDuringWriteIntentResolutionException;
+import org.apache.ignite.internal.tx.TransactionIds;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxPriority;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.UpdateCommandResult;
@@ -254,6 +262,8 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
@SuppressWarnings("unused") // We use it as a placeholder of a
documentation which can be linked using # and @see.
private static final Object INTERNAL_DOC_PLACEHOLDER = null;
+ private static final int WAIT_FOR_CLEANUP_TIMEOUT_MILLIS = 500;
+
/** Logger. */
private static final IgniteLogger LOG =
Loggers.forClass(DefaultTablePartitionReplicaProcessor.class);
@@ -304,6 +314,8 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
/** Runs async scan tasks for effective tail recursion execution (avoid
deep recursive calls). */
private final Executor scanRequestExecutor;
+ private final Executor partitionOperationsExecutor;
+
private final Supplier<Int2ObjectMap<IndexLocker>> indexesLockers;
/** Used to handle race between concurrent rollback and enlist. */
@@ -312,6 +324,9 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
/** Cleanup futures. */
private final ConcurrentHashMap<RowId, CompletableFuture<?>> rowCleanupMap
= new ConcurrentHashMap<>();
+ /** Pending transactions. */
+ private final ConcurrentNavigableMap<UUID, PendingTxContext>
pendingTransactions = new ConcurrentSkipListMap<>();
+
private final SchemaCompatibilityValidator schemaCompatValidator;
private final SchemaSyncService schemaSyncService;
@@ -350,6 +365,8 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
* @param raftCommandRunner Raft client.
* @param txManager Transaction manager.
* @param lockManager Lock manager.
+ * @param scanRequestExecutor Scan executor.
+ * @param partitionOperationsExecutor Partition executor.
* @param replicationGroupId Replication group id.
* @param tableId Table id.
* @param indexesLockers Index lock helper objects.
@@ -375,6 +392,7 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
TxManager txManager,
LockManager lockManager,
Executor scanRequestExecutor,
+ Executor partitionOperationsExecutor,
ZonePartitionId replicationGroupId,
int tableId,
Supplier<Int2ObjectMap<IndexLocker>> indexesLockers,
@@ -401,6 +419,7 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
this.txManager = txManager;
this.lockManager = lockManager;
this.scanRequestExecutor = scanRequestExecutor;
+ this.partitionOperationsExecutor = partitionOperationsExecutor;
this.indexesLockers = indexesLockers;
this.pkIndexStorage = pkIndexStorage;
this.secondaryIndexStorages = secondaryIndexStorages;
@@ -560,7 +579,8 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
}
if (request instanceof GetEstimatedSizeRequest) {
- return processGetEstimatedSizeRequest();
+ return resolvePendingTransactions(((GetEstimatedSizeRequest)
request).timestamp(), safeTime.current(),
+ unused -> completedFuture(mvDataStorage.estimatedSize()));
}
HybridTimestamp opTs =
tableAwareReplicaRequestPreProcessor.getOperationTimestamp(request);
@@ -569,10 +589,6 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
return processOperationRequestWithTxOperationManagementLogic(request,
replicaPrimacy, opTsIfDirectRo);
}
- private CompletableFuture<Long> processGetEstimatedSizeRequest() {
- return completedFuture(mvDataStorage.estimatedSize());
- }
-
private void replicaTouch(UUID txId, UUID coordinatorId, ZonePartitionId
commitPartitionId, @Nullable String txLabel) {
txManager.updateTxMeta(txId, old -> builder(old, PENDING)
.txCoordinatorId(coordinatorId)
@@ -581,6 +597,80 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
.build());
}
+ private <T> CompletableFuture<T> resolvePendingTransactions(
+ HybridTimestamp requestTime,
+ HybridTimestamp currentSafeTime,
+ Function<HybridTimestamp, CompletableFuture<T>> action
+ ) {
+ // Wait for currentSafeTime >= requestTime to avoid out-of-order
transactions arrival.
+ if (currentSafeTime.compareTo(requestTime) < 0) {
+ return safeTime.waitFor(requestTime)
+ .thenComposeAsync(
+ unused -> resolvePendingTransactions(requestTime,
safeTime.current(), action),
+ partitionOperationsExecutor);
+ }
+
+ assert currentSafeTime.compareTo(requestTime) >= 0 : "currentSafeTime
< lowerBoundTimestamp";
+ assert currentSafeTime.compareTo(safeTime.current()) <= 0 :
"currentSafeTime > safeTime";
+
+ // Stable committed snapshot is ensured after resolving pending
transactions state.
+ UUID upperBoundTxId = TransactionIds.transactionId(currentSafeTime,
Integer.MAX_VALUE, TxPriority.NORMAL);
+ ConcurrentNavigableMap<UUID, PendingTxContext> txToWait =
pendingTransactions.headMap(upperBoundTxId, true);
+
+ if (!txToWait.isEmpty()) {
+ List<CompletableFuture<?>> futs = null;
+
+ for (Map.Entry<UUID, PendingTxContext> entry :
txToWait.entrySet()) {
+ if (!entry.getValue().cleanupFut.isDone()) {
+ futs = futs == null ? new ArrayList<>() : futs;
+ futs.add(resolveTransactionState(entry.getKey(),
entry.getValue(), currentSafeTime));
+ }
+ }
+
+ if (futs != null) {
+ return allOf(futs.toArray(CompletableFuture[]::new))
+ .thenComposeAsync(unused ->
action.apply(currentSafeTime), partitionOperationsExecutor);
+ }
+ }
+
+ return action.apply(currentSafeTime);
+ }
+
+ private CompletableFuture<Void> resolveTransactionState(UUID txId,
PendingTxContext txCtx, HybridTimestamp observableTimestamp) {
+ CompletableFuture<Void> resFut = new CompletableFuture<>();
+
+ txCtx.cleanupFut.whenComplete(copyStateTo(resFut));
+
+ return resFut.orTimeout(WAIT_FOR_CLEANUP_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS).handle((ignored, e) -> {
+ if (e == null) {
+ return CompletableFutures.<Void>nullCompletedFuture();
+ }
+
+ if (e instanceof TimeoutException) {
+ // Transaction was not cleaned up in time.
+ // Send tx state request to coordinator to bump commit
timestamp for active txn beyond safe timestamp.
+ return
transactionStateResolver.resolveTxState(txStateResolutionParameters()
+ .txId(txId)
+ .commitGroupId(txCtx.commitPartId)
+ .readTimestamp(observableTimestamp)
+ .build())
+ .thenCompose(r -> {
+ if (r.txState() == PENDING) {
+ // Safe to ignore this transaction, it will be
committed with a higher timestamp.
+ return nullCompletedFuture();
+ }
+
+ // Wait for original future relying on durable
finish to do cleanup in case of failures.
+ return txCtx.cleanupFut;
+ });
+ }
+
+ sneakyThrow(e);
+
+ return null;
+ }).thenCompose(Function.identity());
+ }
+
private static void setDelayedAckProcessor(@Nullable ReplicaResult result,
@Nullable BiConsumer<Object, Throwable> proc) {
if (result != null) {
result.delayedAckProcessor = proc;
@@ -621,6 +711,7 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
private CompletableFuture<?>
processRwSingleRowRequest(ReadWriteSingleRowReplicaRequest req, ReplicaPrimacy
primacy) {
return appendTxCommand(
req.transactionId(),
+ req.commitPartitionId().asZonePartitionId(),
req.requestType(),
req.full(),
() -> processSingleEntryAction(req,
primacy.leaseStartTime()).whenComplete(
@@ -631,6 +722,7 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
private CompletableFuture<?>
processRwSingleRowPkRequest(ReadWriteSingleRowPkReplicaRequest req,
ReplicaPrimacy primacy) {
return appendTxCommand(
req.transactionId(),
+ req.commitPartitionId().asZonePartitionId(),
req.requestType(),
req.full(),
() -> processSingleEntryAction(req,
primacy.leaseStartTime()).whenComplete(
@@ -641,6 +733,7 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
private CompletableFuture<?>
processRwMultiRowRequest(ReadWriteMultiRowReplicaRequest req, ReplicaPrimacy
primacy) {
return appendTxCommand(
req.transactionId(),
+ req.commitPartitionId().asZonePartitionId(),
req.requestType(),
req.full(),
() -> processMultiEntryAction(req,
primacy.leaseStartTime()).whenComplete(
@@ -651,6 +744,7 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
private CompletableFuture<?>
processRwMultiRowPkRequest(ReadWriteMultiRowPkReplicaRequest req,
ReplicaPrimacy primacy) {
return appendTxCommand(
req.transactionId(),
+ req.commitPartitionId().asZonePartitionId(),
req.requestType(),
req.full(),
() -> processMultiEntryAction(req,
primacy.leaseStartTime()).whenComplete(
@@ -661,6 +755,7 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
private CompletableFuture<?>
processRwSwapRowRequest(ReadWriteSwapRowReplicaRequest req, ReplicaPrimacy
primacy) {
return appendTxCommand(
req.transactionId(),
+ req.commitPartitionId().asZonePartitionId(),
req.requestType(),
req.full(),
() -> processTwoEntriesAction(req,
primacy.leaseStartTime()).whenComplete(
@@ -677,7 +772,7 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
replicaTouch(req.transactionId(), req.coordinatorId(),
req.commitPartitionId().asZonePartitionId(), req.txLabel());
// Implicit RW scan can be committed locally on a last batch or error.
- return appendTxCommand(req.transactionId(), RW_SCAN, false, () ->
processScanRetrieveBatchAction(req))
+ return appendTxCommand(req.transactionId(), null, RW_SCAN, false, ()
-> processScanRetrieveBatchAction(req))
.thenCompose(rows -> {
if (allElementsAreNull(rows)) {
return completedFuture(rows);
@@ -1534,6 +1629,14 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
applyWriteIntentSwitchCommandLocally(request);
}
+ partitionOperationsExecutor.execute(() -> {
+ PendingTxContext txCtx =
pendingTransactions.remove(request.txId());
+
+ if (txCtx != null) {
+ txCtx.cleanupFut.complete(null);
+ }
+ });
+
return new ReplicaResult(res, null);
});
}
@@ -1669,6 +1772,7 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
* Appends an operation to prevent the race between commit/rollback and
the operation execution.
*
* @param txId Transaction id.
+ * @param commitPartitionId Commit partition id.
* @param requestType Request type.
* @param full {@code True} if a full transaction and can be immediately
committed.
* @param op Operation closure.
@@ -1676,10 +1780,15 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
*/
private <T> CompletableFuture<T> appendTxCommand(
UUID txId,
+ @Nullable ZonePartitionId commitPartitionId,
RequestType requestType,
boolean full,
Supplier<CompletableFuture<T>> op
) {
+ PendingTxContext txCtx = requestType.isRwRead() ? null :
+ pendingTransactions.computeIfAbsent(txId,
+ id -> new PendingTxContext(new CompletableFuture<>(),
commitPartitionId));
+
if (full) {
AtomicReference<CompletableFuture<T>> futRef = new
AtomicReference<>();
@@ -1690,6 +1799,14 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
releaseTxLocks(txId);
// Drop volatile state.
txManager.updateTxMeta(txId, ignored -> null);
+
+ partitionOperationsExecutor.execute(() -> {
+ // We don't care about errors for pending tx purposes.
+ if (!requestType.isRwRead()) {
+ txCtx.cleanupFut.complete(null);
+ pendingTransactions.remove(txId);
+ }
+ });
});
}
@@ -1714,6 +1831,9 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
Throwable publicCause = isFinishedDueToError ? cause : null;
Integer causeErrorCode = txStateMeta == null ? null :
txStateMeta.lastExceptionErrorCode();
+ // Reduce heap usage.
+ pendingTransactions.remove(txId);
+
return failedFuture(new TransactionException(
finishedTransactionErrorCode(isFinishedDueToTimeout,
isFinishedDueToError),
format(finishedTransactionErrorMessage(
@@ -4085,4 +4205,20 @@ public class DefaultTablePartitionReplicaProcessor
implements TablePartitionRepl
this.transactionMeta = transactionMeta;
}
}
+
+ /**
+ * Represents waiting for cleanup transactions context.
+ */
+ private static class PendingTxContext {
+ final CompletableFuture<Void> cleanupFut;
+ final ZonePartitionId commitPartId;
+
+ PendingTxContext(CompletableFuture<Void> cleanupFut, ZonePartitionId
commitPartId) {
+ assert cleanupFut != null;
+ assert commitPartId != null;
+
+ this.cleanupFut = cleanupFut;
+ this.commitPartId = commitPartId;
+ }
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorIndexLockingTest.java
index a5760940cb0..340dcb33ced 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorIndexLockingTest.java
@@ -264,6 +264,7 @@ public class
DefaultTablePartitionReplicaProcessorIndexLockingTest extends Ignit
newTxManager(),
LOCK_MANAGER,
Runnable::run,
+ Runnable::run,
new ZonePartitionId(ZONE_ID, PART_ID),
TABLE_ID,
() -> Int2ObjectMap.ofEntries(
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorSortedIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorSortedIndexLockingTest.java
index 4c429dc7d2b..39b2eb07231 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorSortedIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorSortedIndexLockingTest.java
@@ -234,6 +234,7 @@ public class
DefaultTablePartitionReplicaProcessorSortedIndexLockingTest extends
newTxManager(),
LOCK_MANAGER,
Runnable::run,
+ Runnable::run,
new ZonePartitionId(ZONE_ID, PART_ID),
TABLE_ID,
() -> Int2ObjectMaps.singleton(pkLocker.id(), pkLocker),
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorTest.java
index db7cdb543aa..453235e9db2 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorTest.java
@@ -663,6 +663,7 @@ public class DefaultTablePartitionReplicaProcessorTest
extends IgniteAbstractTes
txManager,
lockManager,
Runnable::run,
+ Runnable::run,
new ZonePartitionId(tableDescriptor.zoneId(), PART_ID),
TABLE_ID,
() -> Int2ObjectMap.ofEntries(
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index 202bf8cdf8e..58d8b06a7df 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -668,6 +668,7 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
txManager,
lockManager,
Runnable::run,
+ Runnable::run,
new ZonePartitionId(tableDescriptor.zoneId(), PART_ID),
TABLE_ID,
() -> Int2ObjectMap.ofEntries(
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
index cb0754e9567..5579a2b67a9 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -327,13 +327,14 @@ public class InternalTableEstimatedSizeTest extends
BaseIgniteAbstractTest {
txManager,
lockManager,
ForkJoinPool.commonPool(),
+ ForkJoinPool.commonPool(),
new ZonePartitionId(ZONE_ID, partId),
TABLE_ID,
Int2ObjectMaps::emptyMap,
new Lazy<>(() -> null),
Int2ObjectMaps::emptyMap,
clockService,
- new
PendingComparableValuesTracker<>(HybridTimestamp.MIN_VALUE),
+ new
PendingComparableValuesTracker<>(HybridTimestamp.MAX_VALUE),
transactionStateResolver,
storageUpdateHandler,
validationSchemasSource,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 007b0f5cea4..e0fecc0761a 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -1110,6 +1110,7 @@ public class ItTxTestCluster {
txManager,
txManager.lockManager(),
Runnable::run,
+ Runnable::run,
replicationGroupId,
tableId,
indexesLockers,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index ecea6c51af1..d57044f6adf 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -1893,7 +1893,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
}
@Test
- public void testTransactionAlreadyCommitted() {
+ public void testTransactionAlreadyCommitted() throws InterruptedException {
testTransactionAlreadyFinished(true, true, (transaction, uuid) -> {
transaction.commit();
@@ -1902,7 +1902,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
}
@Test
- public void testTransactionAlreadyRolledback() {
+ public void testTransactionAlreadyRolledback() throws InterruptedException
{
testTransactionAlreadyFinished(false, true, (transaction, uuid) -> {
transaction.rollback();
@@ -2177,7 +2177,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
boolean commit,
boolean checkLocks,
BiConsumer<Transaction, UUID> finisher
- ) {
+ ) throws InterruptedException {
Transaction tx = igniteTransactions.begin();
var txId = ((ReadWriteTransactionImpl) tx).id();
@@ -2204,7 +2204,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
assertThrowsTxFinishedException(() -> accountsRv.upsert(tx,
makeValue(2, 300.)));
if (checkLocks) {
-
assertTrue(CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId)));
+ assertTrue(waitForCondition(() ->
CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId)),
2000));
}
if (commit) {
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index de75440356a..3097c49cf0b 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -486,6 +486,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
this.txManager,
this.txManager.lockManager(),
Runnable::run,
+ Runnable::run,
zonePartitionId,
tableId,
() -> Int2ObjectMaps.singleton(pkLocker.id(), pkLocker),
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 277da8a2a92..6973594f3ef 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -235,7 +235,7 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
* @param commit Commit flag.
* @param executionTimestamp The timestamp is the time when the
transaction is applied to the remote node.
* @param full Full state transaction marker.
- * @param isComplete The flag is true if the transaction is completed
through the public API, false for {@link this#kill()} invocation.
+ * @param isFinishedDirectly {@code True} if the transaction is completed
through the public API, false for {@link #kill()}.
* @param finishReason Optional finish reason (for example, timeout). Must
be {@code null} for commit.
* @return The future.
*/
@@ -243,7 +243,7 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
boolean commit,
@Nullable HybridTimestamp executionTimestamp,
boolean full,
- boolean isComplete,
+ boolean isFinishedDirectly,
@Nullable Throwable finishReason
) {
enlistPartitionLock.writeLock().lock();
@@ -251,7 +251,7 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
try {
if (finishFuture == null) {
if (killed) {
- if (isComplete) {
+ if (isFinishedDirectly) {
// An attempt to finish a killed transaction.
finishFuture = nullCompletedFuture();
@@ -266,7 +266,7 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
CompletableFuture<Void> finishFutureInternal =
txManager.finishFull(observableTsTracker, id(),
executionTimestamp, commit, finishReason);
- if (isComplete) {
+ if (isFinishedDirectly) {
finishFuture = finishFutureInternal.handle((unused,
throwable) -> null);
this.timeoutExceeded =
isFinishedDueToTimeout(finishReason);
} else {
@@ -278,7 +278,7 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
return finishFutureInternal;
} else {
- killed = !isComplete;
+ killed = !isFinishedDirectly;
CompletableFuture<Void> finishFutureInternal =
txManager.finish(
observableTsTracker,
@@ -291,7 +291,7 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
id()
);
- if (isComplete) {
+ if (isFinishedDirectly) {
finishFuture = finishFutureInternal.handle((unused,
throwable) -> null);
this.timeoutExceeded =
isFinishedDueToTimeout(finishReason);
} else {