This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c02b3ea7eb IGNITE-18617 Clear rw tx context and cleanup ready futures 
on tx finish (#3436)
c02b3ea7eb is described below

commit c02b3ea7eb15fd62bb48cac527fa49b700accf57
Author: Cyrill <cyrill.si...@gmail.com>
AuthorDate: Thu Mar 21 17:45:09 2024 +0300

    IGNITE-18617 Clear rw tx context and cleanup ready futures on tx finish 
(#3436)
---
 .../internal/testframework/IgniteTestUtils.java      | 11 ++++++++++-
 .../replicator/PartitionReplicaListener.java         | 20 ++++++++++++--------
 .../internal/tx/impl/TransactionInflights.java       | 13 +++++++++----
 .../ignite/internal/tx/impl/TxManagerImpl.java       |  2 +-
 4 files changed, 32 insertions(+), 14 deletions(-)

diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index f6aa4ce8c8..1b76ccef0f 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -289,7 +289,16 @@ public final class IgniteTestUtils {
         try {
             run.execute();
         } catch (Throwable throwable) {
-            assertInstanceOf(expectedClass, throwable);
+            try {
+                assertInstanceOf(expectedClass, throwable);
+            } catch (AssertionError err) {
+                // An AssertionError from assertInstanceOf has nothing but a 
class name of the original exception.
+                AssertionError assertionError = new AssertionError(err);
+
+                assertionError.addSuppressed(throwable);
+
+                throw assertionError;
+            }
 
             IgniteException igniteException = (IgniteException) throwable;
             assertEquals(expectedErrorCode, igniteException.code());
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index b16202cabf..755992f497 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1730,7 +1730,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         List<CompletableFuture<?>> txUpdateFutures = new ArrayList<>();
         List<CompletableFuture<?>> txReadFutures = new ArrayList<>();
 
-        // TODO https://issues.apache.org/jira/browse/IGNITE-18617
         txCleanupReadyFutures.compute(txId, (id, txOps) -> {
             if (txOps == null) {
                 return null;
@@ -1746,7 +1745,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
             txOps.futures.clear();
 
-            return txOps;
+            return null;
         });
 
         return allOfFuturesExceptionIgnored(txUpdateFutures, commit, txId)
@@ -1894,18 +1893,23 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         var cleanupReadyFut = new CompletableFuture<Void>();
 
         txCleanupReadyFutures.compute(txId, (id, txOps) -> {
-            if (txOps == null) {
-                txOps = new TxCleanupReadyFutureList();
-            }
-
+            // First check whether the transaction has already been finished.
+            // And complete cleanupReadyFut with exception if it is the case.
             TxStateMeta txStateMeta = txManager.stateMeta(txId);
 
             if (txStateMeta == null || isFinalState(txStateMeta.txState())) {
                 cleanupReadyFut.completeExceptionally(new Exception());
-            } else {
-                txOps.futures.computeIfAbsent(cmdType, type -> new 
ArrayList<>()).add(cleanupReadyFut);
+
+                return txOps;
+            }
+
+            // Otherwise collect cleanupReadyFut in the transaction's futures.
+            if (txOps == null) {
+                txOps = new TxCleanupReadyFutureList();
             }
 
+            txOps.futures.computeIfAbsent(cmdType, type -> new 
ArrayList<>()).add(cleanupReadyFut);
+
             return txOps;
         });
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index b303532fe9..bdbe1d0ab2 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -87,16 +87,17 @@ public class TransactionInflights {
      * @param txId The transaction id.
      */
     public void removeInflight(UUID txId) {
-        TxContext tuple = txCtxMap.compute(txId, (uuid, ctx) -> {
-            assert ctx != null : format("No tx context found on removing 
inflight [txId={}]", txId);
-
+        // Can be null if tx was aborted and inflights were removed from the 
collection.
+        TxContext tuple = txCtxMap.computeIfPresent(txId, (uuid, ctx) -> {
             ctx.removeInflight(txId);
 
             return ctx;
         });
 
         // Avoid completion under lock.
-        tuple.onInflightsRemoved();
+        if (tuple != null) {
+            tuple.onInflightsRemoved();
+        }
     }
 
     Collection<UUID> finishedReadOnlyTransactions() {
@@ -106,6 +107,10 @@ public class TransactionInflights {
                 .collect(toSet());
     }
 
+    void removeTxContext(UUID txId) {
+        txCtxMap.remove(txId);
+    }
+
     void removeTxContexts(Collection<UUID> txIds) {
         txCtxMap.keySet().removeAll(txIds);
     }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 2f0c40e4de..156d3bf50a 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -517,7 +517,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             if (localNodeId.equals(finishingStateMeta.txCoordinatorId())) {
                 decrementRwTxCount(txId);
             }
-        });
+        }).whenComplete((unused, throwable) -> 
transactionInflights.removeTxContext(txId));
     }
 
     private static CompletableFuture<Void> checkTxOutcome(boolean commit, UUID 
txId, TransactionMeta stateMeta) {

Reply via email to