This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new c02b3ea7eb IGNITE-18617 Clear rw tx context and cleanup ready futures on tx finish (#3436) c02b3ea7eb is described below commit c02b3ea7eb15fd62bb48cac527fa49b700accf57 Author: Cyrill <cyrill.si...@gmail.com> AuthorDate: Thu Mar 21 17:45:09 2024 +0300 IGNITE-18617 Clear rw tx context and cleanup ready futures on tx finish (#3436) --- .../internal/testframework/IgniteTestUtils.java | 11 ++++++++++- .../replicator/PartitionReplicaListener.java | 20 ++++++++++++-------- .../internal/tx/impl/TransactionInflights.java | 13 +++++++++---- .../ignite/internal/tx/impl/TxManagerImpl.java | 2 +- 4 files changed, 32 insertions(+), 14 deletions(-) diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java index f6aa4ce8c8..1b76ccef0f 100644 --- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java @@ -289,7 +289,16 @@ public final class IgniteTestUtils { try { run.execute(); } catch (Throwable throwable) { - assertInstanceOf(expectedClass, throwable); + try { + assertInstanceOf(expectedClass, throwable); + } catch (AssertionError err) { + // An AssertionError from assertInstanceOf has nothing but a class name of the original exception. + AssertionError assertionError = new AssertionError(err); + + assertionError.addSuppressed(throwable); + + throw assertionError; + } IgniteException igniteException = (IgniteException) throwable; assertEquals(expectedErrorCode, igniteException.code()); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index b16202cabf..755992f497 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -1730,7 +1730,6 @@ public class PartitionReplicaListener implements ReplicaListener { List<CompletableFuture<?>> txUpdateFutures = new ArrayList<>(); List<CompletableFuture<?>> txReadFutures = new ArrayList<>(); - // TODO https://issues.apache.org/jira/browse/IGNITE-18617 txCleanupReadyFutures.compute(txId, (id, txOps) -> { if (txOps == null) { return null; @@ -1746,7 +1745,7 @@ public class PartitionReplicaListener implements ReplicaListener { txOps.futures.clear(); - return txOps; + return null; }); return allOfFuturesExceptionIgnored(txUpdateFutures, commit, txId) @@ -1894,18 +1893,23 @@ public class PartitionReplicaListener implements ReplicaListener { var cleanupReadyFut = new CompletableFuture<Void>(); txCleanupReadyFutures.compute(txId, (id, txOps) -> { - if (txOps == null) { - txOps = new TxCleanupReadyFutureList(); - } - + // First check whether the transaction has already been finished. + // And complete cleanupReadyFut with exception if it is the case. TxStateMeta txStateMeta = txManager.stateMeta(txId); if (txStateMeta == null || isFinalState(txStateMeta.txState())) { cleanupReadyFut.completeExceptionally(new Exception()); - } else { - txOps.futures.computeIfAbsent(cmdType, type -> new ArrayList<>()).add(cleanupReadyFut); + + return txOps; + } + + // Otherwise collect cleanupReadyFut in the transaction's futures. + if (txOps == null) { + txOps = new TxCleanupReadyFutureList(); } + txOps.futures.computeIfAbsent(cmdType, type -> new ArrayList<>()).add(cleanupReadyFut); + return txOps; }); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java index b303532fe9..bdbe1d0ab2 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java @@ -87,16 +87,17 @@ public class TransactionInflights { * @param txId The transaction id. */ public void removeInflight(UUID txId) { - TxContext tuple = txCtxMap.compute(txId, (uuid, ctx) -> { - assert ctx != null : format("No tx context found on removing inflight [txId={}]", txId); - + // Can be null if tx was aborted and inflights were removed from the collection. + TxContext tuple = txCtxMap.computeIfPresent(txId, (uuid, ctx) -> { ctx.removeInflight(txId); return ctx; }); // Avoid completion under lock. - tuple.onInflightsRemoved(); + if (tuple != null) { + tuple.onInflightsRemoved(); + } } Collection<UUID> finishedReadOnlyTransactions() { @@ -106,6 +107,10 @@ public class TransactionInflights { .collect(toSet()); } + void removeTxContext(UUID txId) { + txCtxMap.remove(txId); + } + void removeTxContexts(Collection<UUID> txIds) { txCtxMap.keySet().removeAll(txIds); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index 2f0c40e4de..156d3bf50a 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -517,7 +517,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { if (localNodeId.equals(finishingStateMeta.txCoordinatorId())) { decrementRwTxCount(txId); } - }); + }).whenComplete((unused, throwable) -> transactionInflights.removeTxContext(txId)); } private static CompletableFuture<Void> checkTxOutcome(boolean commit, UUID txId, TransactionMeta stateMeta) {