This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c7cf15e52 IGNITE-21071 Rollback the transaction on primary failure if 
replication is not finished (#3030)
4c7cf15e52 is described below

commit 4c7cf15e520eee0097712e78591150981307dac6
Author: Denis Chudov <moongll...@gmail.com>
AuthorDate: Thu Jan 18 19:50:49 2024 +0300

    IGNITE-21071 Rollback the transaction on primary failure if replication is 
not finished (#3030)
---
 .../ignite/client/handler/FakePlacementDriver.java |  11 +
 .../internal/compute/ItWorkerShutdownTest.java     |  14 +-
 .../internal/index/IndexBuildControllerTest.java   |   5 +
 .../internal/placementdriver/PlacementDriver.java  |  10 +
 .../placementdriver/TestPlacementDriver.java       |   5 +
 .../placementdriver/leases/LeaseTracker.java       |  64 +++--
 .../ignite/internal/raft/RaftGroupServiceImpl.java |   9 +-
 .../ignite/internal/table/ItDurableFinishTest.java |  19 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   7 +
 .../java/org/apache/ignite/internal/Cluster.java   |  25 ++
 .../internal/ClusterPerTestIntegrationTest.java    |  17 ++
 .../ignite/internal/table/ItColocationTest.java    |   2 +-
 .../internal/table/ItTransactionRecoveryTest.java  | 300 ++++++++++++++-------
 .../ignite/internal/table/TxAbstractTest.java      |  16 +-
 .../tx/TransactionAlreadyFinishedException.java    |   8 +-
 .../tx/impl/PrimaryReplicaExpiredException.java    |   2 +-
 .../ignite/internal/tx/impl/TxManagerImpl.java     | 158 ++++++++---
 .../tx/AbstractDeadlockPreventionTest.java         |  17 ++
 .../apache/ignite/internal/tx/TxManagerTest.java   |  15 +-
 19 files changed, 532 insertions(+), 172 deletions(-)

diff --git 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 5853de37f1..e110c38fe7 100644
--- 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++ 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -103,6 +103,17 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
         return nullCompletedFuture();
     }
 
+    @Override
+    public ReplicaMeta currentLease(ReplicationGroupId groupId) {
+        TablePartitionId id = (TablePartitionId) groupId;
+
+        if (returnError) {
+            throw new RuntimeException("FakePlacementDriver expected error");
+        } else {
+            return primaryReplicas.get(id.partitionId());
+        }
+    }
+
     private static ReplicaMeta getReplicaMeta(String leaseholder, long 
leaseStartTime) {
         //noinspection serial
         return new ReplicaMeta() {
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
index 2dcac4163c..307ae2c627 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
@@ -223,7 +223,7 @@ class ItWorkerShutdownTest extends 
ClusterPerTestIntegrationTest {
         UUID jobIdBeforeFail = idSync(execution);
 
         // When stop worker node.
-        stopNode(workerNodeName);
+        stopNodeByName(workerNodeName);
         // And remove it from candidates.
         remoteWorkerCandidates.remove(workerNodeName);
 
@@ -273,7 +273,7 @@ class ItWorkerShutdownTest extends 
ClusterPerTestIntegrationTest {
         checkGlobalInteractiveJobAlive(execution);
 
         // When stop worker node.
-        stopNode(workerNodeName);
+        stopNodeByName(workerNodeName);
 
         // Then the job is failed, because there is no any failover worker.
         assertThat(execution.resultAsync(), willThrow(IgniteException.class));
@@ -295,7 +295,7 @@ class ItWorkerShutdownTest extends 
ClusterPerTestIntegrationTest {
         assertThat(getWorkerNodeNameFromGlobalInteractiveJob(), 
equalTo(entryNode.name()));
 
         // When stop entry node.
-        stopNode(entryNode.name());
+        stopNodeByName(entryNode.name());
 
         // Then the job is failed, because there is no any failover worker.
         assertThat(execution.resultAsync().isCompletedExceptionally(), 
equalTo(true));
@@ -321,7 +321,7 @@ class ItWorkerShutdownTest extends 
ClusterPerTestIntegrationTest {
         executions.forEach(ItWorkerShutdownTest::checkInteractiveJobAlive);
 
         // When stop one of workers.
-        stopNode(node(1).name());
+        stopNodeByName(node(1).name());
 
         // Then two jobs are alive.
         executions.forEach((node, execution) -> {
@@ -355,7 +355,7 @@ class ItWorkerShutdownTest extends 
ClusterPerTestIntegrationTest {
         checkGlobalInteractiveJobAlive(execution);
 
         // When stop worker node.
-        stopNode(workerNodeName);
+        stopNodeByName(workerNodeName);
         // And remove it from candidates.
         remoteWorkerCandidates.remove(workerNodeName);
 
@@ -417,10 +417,10 @@ class ItWorkerShutdownTest extends 
ClusterPerTestIntegrationTest {
     }
 
     private void stopNode(IgniteImpl ignite) {
-        stopNode(ignite.name());
+        stopNodeByName(ignite.name());
     }
 
-    private void stopNode(String name) {
+    private void stopNodeByName(String name) {
         int ind = NODES_NAMES_TO_INDEXES.get(name);
         node(ind).stop();
     }
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
index 1b5d6f1169..2423e913f7 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
@@ -299,6 +299,11 @@ public class IndexBuildControllerTest extends 
BaseIgniteAbstractTest {
             throw new UnsupportedOperationException();
         }
 
+        @Override
+        public ReplicaMeta currentLease(ReplicationGroupId groupId) {
+            return primaryReplicaMetaFutureById.get(groupId).join();
+        }
+
         CompletableFuture<Void> setPrimaryReplicaMeta(
                 long causalityToken,
                 TablePartitionId replicaId,
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
index 73d887cbb4..fbc3d564c7 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Service that provides an ability to await and retrieve primary replicas for 
replication groups.
@@ -77,4 +78,13 @@ public interface PlacementDriver extends 
EventProducer<PrimaryReplicaEvent, Prim
      * @return Future.
      */
     CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId grpId);
+
+    /**
+     * Returns the current knowledge about the lease on the local node.
+     *
+     * @param groupId Group id.
+     * @return Current lease.
+     */
+    @Nullable
+    ReplicaMeta currentLease(ReplicationGroupId groupId);
 }
diff --git 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index 9fd5940041..9ea1b14ce0 100644
--- 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++ 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -87,6 +87,11 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
         return nullCompletedFuture();
     }
 
+    @Override
+    public ReplicaMeta currentLease(ReplicationGroupId groupId) {
+        return getReplicaMetaFuture().join();
+    }
+
     @Override
     public CompletableFuture<Void> fireEvent(PrimaryReplicaEvent event, 
PrimaryReplicaEventParameters parameters) {
         return super.fireEvent(event, parameters);
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index d44d491fd7..fc8e50e40c 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -135,6 +135,15 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
         return expirationFutureByGroup.getOrDefault(grpId, 
nullCompletedFuture());
     }
 
+    @Override
+    public ReplicaMeta currentLease(ReplicationGroupId groupId) {
+        return inBusyLock(busyLock, () -> {
+            Lease lease = getLease(groupId);
+
+            return lease.isAccepted() ? lease : null;
+        });
+    }
+
     /**
      * Gets a lease for a particular group.
      *
@@ -162,6 +171,7 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
         public CompletableFuture<Void> onUpdate(WatchEvent event) {
             return inBusyLockAsync(busyLock, () -> {
                 List<CompletableFuture<?>> fireEventFutures = new 
ArrayList<>();
+                List<Lease> expiredLeases = new ArrayList<>();
 
                 for (EntryEvent entry : event.entryEvents()) {
                     Entry msEntry = entry.newEntry();
@@ -188,18 +198,26 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
                             }
                         }
 
-                        firePrimaryReplicaExpiredEventIfNeeded(grpId, 
event.revision(), lease);
+                        if (needToFireEventReplicaExpired(grpId, lease)) {
+                            
expiredLeases.add(leases.leaseByGroupId().get(grpId));
+                        }
                     }
 
                     for (ReplicationGroupId grpId : 
leases.leaseByGroupId().keySet()) {
                         if (!leasesMap.containsKey(grpId)) {
                             tryRemoveTracker(grpId);
 
-                            firePrimaryReplicaExpiredEventIfNeeded(grpId, 
event.revision(), null);
+                            if (needToFireEventReplicaExpired(grpId, null)) {
+                                
expiredLeases.add(leases.leaseByGroupId().get(grpId));
+                            }
                         }
                     }
 
                     leases = new Leases(unmodifiableMap(leasesMap), 
leasesBytes);
+
+                    for (Lease expiredLease : expiredLeases) {
+                        firePrimaryReplicaExpiredEvent(event.revision(), 
expiredLease);
+                    }
                 }
 
                 return 
allOf(fireEventFutures.toArray(CompletableFuture[]::new));
@@ -308,10 +326,10 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
      * Fires the primary replica expire event if it needs.
      *
      * @param grpId Group id, used for the cases when the {@code lease} 
parameter is null. Should be always not null.
-     * @param causalityToken Causality token.
      * @param lease Lease to check on expiration.
+     * @return Whether the event is needed.
      */
-    private void firePrimaryReplicaExpiredEventIfNeeded(ReplicationGroupId 
grpId, long causalityToken, @Nullable Lease lease) {
+    private boolean needToFireEventReplicaExpired(ReplicationGroupId grpId, 
@Nullable Lease lease) {
         assert lease == null || lease.replicationGroupId().equals(grpId)
                 : IgniteStringFormatter.format("Group id mismatch [groupId={}, 
lease={}]", grpId, lease);
 
@@ -321,20 +339,34 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
             boolean sameLease = lease != null && 
currentLease.getStartTime().equals(lease.getStartTime());
 
             if (!sameLease) {
-                CompletableFuture<Void> prev = 
expirationFutureByGroup.put(grpId, fireEvent(
-                        PRIMARY_REPLICA_EXPIRED,
-                        new PrimaryReplicaEventParameters(
-                                causalityToken,
-                                grpId,
-                                currentLease.getLeaseholderId(),
-                                currentLease.getLeaseholder(),
-                                currentLease.getStartTime()
-                        )
-                ));
-
-                assert prev == null || prev.isDone() : "Previous lease 
expiration process has not completed yet [grpId=" + grpId + ']';
+                return true;
             }
         }
+
+        return false;
+    }
+
+    /**
+     * Fires the primary replica expire event.
+     *
+     * @param causalityToken Causality token.
+     * @param expiredLease Expired lease.
+     */
+    private void firePrimaryReplicaExpiredEvent(long causalityToken, Lease 
expiredLease) {
+        ReplicationGroupId grpId = expiredLease.replicationGroupId();
+
+        CompletableFuture<Void> prev = expirationFutureByGroup.put(grpId, 
fireEvent(
+                PRIMARY_REPLICA_EXPIRED,
+                new PrimaryReplicaEventParameters(
+                        causalityToken,
+                        grpId,
+                        expiredLease.getLeaseholderId(),
+                        expiredLease.getLeaseholder(),
+                        expiredLease.getStartTime()
+                )
+        ));
+
+        assert prev == null || prev.isDone() : "Previous lease expiration 
process has not completed yet [grpId=" + grpId + ']';
     }
 
     private CompletableFuture<Void> fireEventReplicaBecomePrimary(long 
causalityToken, Lease lease) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index f0f3ec1304..1441ca84d7 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -587,15 +587,18 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
             CompletableFuture<? extends NetworkMessage> fut
     ) {
         if (recoverable(err)) {
+            Peer randomPeer = randomNode(peer);
+
             LOG.warn(
                     "Recoverable error during the request occurred (will be 
retried on the randomly selected node) "
-                            + "[request={}, peer={}].",
+                            + "[request={}, peer={}, newPeer={}].",
                     err,
                     sentRequest,
-                    peer
+                    peer,
+                    randomPeer
             );
 
-            scheduleRetry(() -> sendWithRetry(randomNode(peer), 
requestFactory, stopTime, fut));
+            scheduleRetry(() -> sendWithRetry(randomPeer, requestFactory, 
stopTime, fut));
         } else {
             fut.completeExceptionally(err);
         }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index ab22a7f004..90f3e8481b 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -30,6 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
@@ -76,9 +77,7 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
             throws ExecutionException, InterruptedException {
         createTestTableWith3Replicas();
 
-        TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
-
-        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+        var tblReplicationGrp = defaultTablePartitionId(node(0));
 
         CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
                 tblReplicationGrp,
@@ -101,6 +100,8 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         Tuple keyTpl = Tuple.create().set("key", 42);
         Tuple tpl = Tuple.create().set("key", 42).set("val", "val 42");
 
+        TableImpl tbl = (TableImpl) coordinatorNode.tables().table(TABLE_NAME);
+
         tbl.recordView().upsert(rwTx, tpl);
 
         msgConf.accept(primaryNode, coordinatorNode, tbl, rwTx);
@@ -108,6 +109,12 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         finisher.accept(rwTx, tbl, keyTpl);
     }
 
+    private TablePartitionId defaultTablePartitionId(IgniteImpl node) {
+        TableImpl table = (TableImpl) node.tables().table(TABLE_NAME);
+
+        return new TablePartitionId(table.tableId(), 0);
+    }
+
     private void commitRow(InternalTransaction rwTx, TableImpl tbl, Tuple 
keyTpl) {
         rwTx.commit();
 
@@ -139,12 +146,16 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
 
         AtomicBoolean dropMessage = new AtomicBoolean(true);
 
+        CountDownLatch commitStartedLatch = new CountDownLatch(1);
+
         // Make sure the finish message is prepared, i.e. the outcome, commit 
timestamp, primary node, etc. have been set,
         // and then temporarily block the messaging to simulate network issues.
         coordinatorMessaging.dropMessages((s, networkMessage) -> {
             if (networkMessage instanceof TxFinishReplicaRequest && 
dropMessage.get()) {
                 logger().info("Dropping: {}.", networkMessage);
 
+                commitStartedLatch.countDown();
+
                 return true;
             }
 
@@ -156,6 +167,8 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         // will run in the current thread.
         CompletableFuture.runAsync(() -> {
             try {
+                commitStartedLatch.await();
+
                 logger().info("Start transferring primary.");
 
                 NodeUtils.transferPrimary(tbl, null, this::node);
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 6afc21aa95..d8611e60dc 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -164,6 +164,7 @@ import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.VaultService;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ChannelType;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.DefaultMessagingService;
@@ -1188,6 +1189,12 @@ public class IgniteImpl implements Ignite {
         return ((DefaultMessagingService) 
clusterSvc.messagingService()).dropMessagesPredicate();
     }
 
+    // TODO IGNITE-18493 - remove/move this
+    @TestOnly
+    public void sendFakeMessage(String recipientConsistentId, NetworkMessage 
msg) {
+        clusterSvc.messagingService().send(recipientConsistentId, 
ChannelType.DEFAULT, msg);
+    }
+
     // TODO: IGNITE-18493 - remove/move this
     @TestOnly
     public void stopDroppingMessages() {
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index f69509742c..c54a802490 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -378,6 +378,31 @@ public class Cluster {
         nodes.set(index, null);
     }
 
+    /**
+     * Stops a node by name.
+     *
+     * @param name Name of the node in the cluster.
+     */
+    public void stopNode(String name) {
+        stopNode(nodeIndex(name));
+    }
+
+    /**
+     * Returns index of the node.
+     *
+     * @param name Node name.
+     * @return Node index.
+     */
+    public int nodeIndex(String name) {
+        for (int i = 0; i < nodes.size(); i++) {
+            if (nodes.get(i) != null && nodes.get(i).name().equals(name)) {
+                return i;
+            }
+        }
+
+        throw new IllegalArgumentException("Node is not found: " + name);
+    }
+
     /**
      * Restarts a node by index.
      *
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index cc1d6fc133..ed657d7e85 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal;
 
 import java.nio.file.Path;
 import java.util.List;
+import java.util.stream.Stream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.InitParametersBuilder;
 import org.apache.ignite.internal.app.IgniteImpl;
@@ -171,6 +172,22 @@ public abstract class ClusterPerTestIntegrationTest 
extends IgniteIntegrationTes
         cluster.stopNode(nodeIndex);
     }
 
+    /**
+     * Stops a node by name.
+     *
+     * @param name Name of the node in the cluster.
+     */
+    protected void stopNode(String name) {
+        cluster.stopNode(name);
+    }
+
+    /**
+     * Returns nodes that are started and not stopped. This can include 
knocked out nodes.
+     */
+    protected final Stream<IgniteImpl> runningNodes() {
+        return cluster.runningNodes();
+    }
+
     /**
      * Restarts a node by index.
      *
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index c72763b031..c0b023a208 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -173,7 +173,7 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
             public CompletableFuture<Void> finish(
                     HybridTimestampTracker observableTimestampTracker,
                     TablePartitionId commitPartition,
-                    boolean commit,
+                    boolean commitIntent,
                     Map<TablePartitionId, Long> enlistedGroups,
                     UUID txId
             ) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
similarity index 77%
rename from 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
rename to 
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index 694f27620e..d72df1b78e 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -26,6 +26,9 @@ import static 
org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -34,13 +37,19 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import java.util.stream.IntStream;
 import org.apache.ignite.InitParametersBuilder;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import 
org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.replicator.message.ErrorTimestampAwareReplicaResponse;
+import 
org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
@@ -60,6 +69,7 @@ import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
 import org.apache.ignite.tx.TransactionOptions;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
@@ -70,6 +80,8 @@ import org.junit.jupiter.api.TestInfo;
  * Abandoned transactions integration tests.
  */
 public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest {
+    private static final PlacementDriverMessagesFactory 
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
+
     /** Table name. */
     private static final String TABLE_NAME = "test_table";
 
@@ -104,16 +116,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
 
         var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
 
-        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
-                tblReplicationGrp,
-                node(0).clock().now(),
-                10,
-                SECONDS
-        );
-
-        assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
-        String leaseholder = primaryReplicaFut.join().getLeaseholder();
+        String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
 
         IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
 
@@ -168,16 +171,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
 
         var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
 
-        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
-                tblReplicationGrp,
-                node(0).clock().now(),
-                10,
-                SECONDS
-        );
-
-        assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
-        String leaseholder = primaryReplicaFut.join().getLeaseholder();
+        String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
 
         IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
 
@@ -220,16 +214,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
 
         var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
 
-        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
-                tblReplicationGrp,
-                node(0).clock().now(),
-                10,
-                SECONDS
-        );
-
-        assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
-        String leaseholder = primaryReplicaFut.join().getLeaseholder();
+        String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
 
         IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
 
@@ -275,16 +260,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
 
         var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
 
-        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
-                tblReplicationGrp,
-                node(0).clock().now(),
-                10,
-                SECONDS
-        );
-
-        assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
-        String leaseholder = primaryReplicaFut.join().getLeaseholder();
+        String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
 
         IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
 
@@ -329,16 +305,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
 
         var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
 
-        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
-                tblReplicationGrp,
-                node(0).clock().now(),
-                10,
-                SECONDS
-        );
-
-        assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
-        String leaseholder = primaryReplicaFut.join().getLeaseholder();
+        String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
 
         IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
 
@@ -402,16 +369,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
 
         var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
 
-        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
-                tblReplicationGrp,
-                node(0).clock().now(),
-                10,
-                SECONDS
-        );
-
-        assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
-        String leaseholder = primaryReplicaFut.join().getLeaseholder();
+        String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
 
         IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
 
@@ -480,16 +438,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
 
         var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
 
-        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
-                tblReplicationGrp,
-                node(0).clock().now(),
-                10,
-                SECONDS
-        );
-
-        assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
-        String leaseholder = primaryReplicaFut.join().getLeaseholder();
+        String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
 
         IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
 
@@ -564,16 +513,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
 
         var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
 
-        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
-                tblReplicationGrp,
-                node(0).clock().now(),
-                10,
-                SECONDS
-        );
-
-        assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
-        String leaseholder = primaryReplicaFut.join().getLeaseholder();
+        String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
 
         IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
 
@@ -650,16 +590,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
 
         var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
 
-        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
-                tblReplicationGrp,
-                node(0).clock().now(),
-                10,
-                SECONDS
-        );
-
-        assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
-        String leaseholder = primaryReplicaFut.join().getLeaseholder();
+        String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
 
         IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
 
@@ -690,6 +621,156 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
         assertThat(finish2, 
willThrow(TransactionAlreadyFinishedException.class));
     }
 
+    @Test
+    public void testPrimaryFailureRightAfterCommitMsg() throws Exception {
+        TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
+
+        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+
+        String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
+
+        IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
+
+        log.info("Transaction commit partition is determined [node={}].", 
commitPartNode.name());
+
+        IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
+
+        log.info("Transaction coordinator node is determined [node={}].", 
txCrdNode.name());
+
+        Transaction rwTx1 = createRwTransaction(txCrdNode);
+
+        CompletableFuture<?> commitMsgSentFut = new CompletableFuture<>();
+        CompletableFuture<?> cancelLeaseFuture = new CompletableFuture<>();
+
+        txCrdNode.dropMessages((nodeName, msg) -> {
+            if (msg instanceof TxFinishReplicaRequest) {
+                boolean isFirst = !commitMsgSentFut.isDone();
+
+                if (isFirst) {
+                    commitMsgSentFut.complete(null);
+
+                    return true;
+                } else {
+                    cancelLeaseFuture.join();
+
+                    return false;
+                }
+            }
+
+            return false;
+        });
+
+        CompletableFuture<Void> commitFut = rwTx1.commitAsync();
+
+        assertThat(commitMsgSentFut, willCompleteSuccessfully());
+
+        cancelLease(commitPartNode, tblReplicationGrp);
+
+        waitAndGetLeaseholder(txCrdNode, tblReplicationGrp);
+
+        cancelLeaseFuture.complete(null);
+
+        assertThat(commitFut, willCompleteSuccessfully());
+
+        RecordView<Tuple> view = 
txCrdNode.tables().table(TABLE_NAME).recordView();
+
+        var rec = view.get(null, Tuple.create().set("key", 42));
+
+        assertNotNull(rec);
+        assertEquals((Integer) 42, rec.value("key"));
+        assertEquals("val1", rec.value("val"));
+    }
+
+    @Test
+    public void testPrimaryFailureWhileInflightInProgress() throws Exception {
+        TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
+
+        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+
+        String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
+
+        IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
+
+        log.info("Transaction commit partition is determined [node={}].", 
commitPartNode.name());
+
+        IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
+
+        log.info("Transaction coordinator node is determined [node={}].", 
txCrdNode.name());
+
+        Transaction rwTx1 = createRwTransaction(txCrdNode);
+
+        txCrdNode.dropMessages((nodeName, msg) -> {
+            if (msg instanceof ReadWriteSingleRowReplicaRequest) {
+                return true;
+            }
+
+            return false;
+        });
+
+        assertThrows(TransactionException.class, () -> {
+            RecordView<Tuple> view = 
txCrdNode.tables().table(TABLE_NAME).recordView();
+            view.upsert(rwTx1, Tuple.create().set("key", 1).set("val", 
"val1"));
+        });
+
+        CompletableFuture<Void> commitFut = rwTx1.commitAsync();
+
+        commitPartNode.stop();
+
+        assertThat(commitFut, willCompleteSuccessfully());
+    }
+
+    @Test
+    public void testPrimaryFailureWhileInflightInProgressAfterFirstResponse() 
throws Exception {
+        TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
+
+        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+
+        String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
+
+        IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
+
+        log.info("Transaction commit partition is determined [node={}].", 
commitPartNode.name());
+
+        IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
+
+        log.info("Transaction coordinator node is determined [node={}].", 
txCrdNode.name());
+
+        CompletableFuture<?> firstResponseSent = new CompletableFuture<>();
+
+        commitPartNode.dropMessages((nodeName, msg) -> {
+            if (msg instanceof TimestampAwareReplicaResponse) {
+                TimestampAwareReplicaResponse response = 
(TimestampAwareReplicaResponse) msg;
+
+                if (response.result() == null) {
+                    firstResponseSent.complete(null);
+                }
+
+                // This means this is the second response that finishes an 
in-flight future.
+                if (response.result() instanceof UUID) {
+                    return true;
+                }
+            }
+
+            return false;
+        });
+
+        Transaction rwTx1 = createRwTransaction(txCrdNode);
+
+        CompletableFuture<Void> commitFut = rwTx1.commitAsync();
+
+        assertThat(firstResponseSent, willCompleteSuccessfully());
+
+        cancelLease(commitPartNode, tblReplicationGrp);
+
+        assertThat(commitFut, 
willThrow(TransactionAlreadyFinishedException.class, 30, SECONDS));
+
+        RecordView<Tuple> view = 
txCrdNode.tables().table(TABLE_NAME).recordView();
+
+        var rec = view.get(null, Tuple.create().set("key", 42));
+
+        assertNull(rec);
+    }
+
     private DefaultMessagingService messaging(IgniteImpl node) {
         ClusterService coordinatorService = 
IgniteTestUtils.getFieldValue(node, IgniteImpl.class, "clusterSvc");
 
@@ -802,19 +883,42 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
         return rwTx1;
     }
 
-    private IgniteImpl commitPartitionPrimaryNode(String leaseholder) {
-        return IntStream.range(0, initialNodes())
+    private IgniteImpl findNode(int startRange, int endRange, 
Predicate<IgniteImpl> filter) {
+        return IntStream.range(startRange, endRange)
                 .mapToObj(this::node)
-                .filter(n -> leaseholder.equals(n.name()))
+                .filter(filter::test)
                 .findFirst()
                 .get();
     }
 
+    private IgniteImpl commitPartitionPrimaryNode(String leaseholder) {
+        return findNode(0, initialNodes(), n -> leaseholder.equals(n.name()));
+    }
+
     private IgniteImpl nonPrimaryNode(String leaseholder) {
-        return IntStream.range(1, initialNodes())
-                .mapToObj(this::node)
-                .filter(n -> !leaseholder.equals(n.name()))
-                .findFirst()
-                .get();
+        return findNode(1, initialNodes(), n -> !leaseholder.equals(n.name()));
+    }
+
+    private String waitAndGetLeaseholder(IgniteImpl node, ReplicationGroupId 
tblReplicationGrp) throws InterruptedException {
+        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node.placementDriver().awaitPrimaryReplica(
+                tblReplicationGrp,
+                node.clock().now(),
+                10,
+                SECONDS
+        );
+
+        assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+        return primaryReplicaFut.join().getLeaseholder();
+    }
+
+    private void cancelLease(IgniteImpl leaseholder, ReplicationGroupId 
groupId) {
+        StopLeaseProlongationMessage msg = PLACEMENT_DRIVER_MESSAGES_FACTORY
+                .stopLeaseProlongationMessage()
+                .groupId(groupId)
+                .build();
+
+        // Just sent it to all nodes to not determine the exact placement 
driver active actor.
+        runningNodes().forEach(node -> 
leaseholder.sendFakeMessage(node.name(), msg));
     }
 }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 5499863538..b3505d2d21 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -27,6 +27,7 @@ import static org.hamcrest.Matchers.contains;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -429,7 +430,13 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         });
 
         var err = assertThrows(CompletionException.class, fut0::join);
-        assertEquals(IllegalArgumentException.class, 
err.getCause().getClass());
+
+        try {
+            assertInstanceOf(IllegalArgumentException.class, err.getCause());
+        } catch (AssertionError e) {
+            throw new AssertionError("Unexpected exception type", err);
+        }
+
         assertEquals(balance, view.get(null, 
makeKey(1)).doubleValue("balance"));
     }
 
@@ -449,7 +456,12 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         });
 
         var err = assertThrows(CompletionException.class, fut0::join);
-        assertEquals(NullPointerException.class, err.getCause().getClass());
+
+        try {
+            assertInstanceOf(NullPointerException.class, err.getCause());
+        } catch (AssertionError e) {
+            throw new AssertionError("Unexpected exception type", err);
+        }
     }
 
     @Test
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAlreadyFinishedException.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAlreadyFinishedException.java
index 2121aafa3d..6d14d467fa 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAlreadyFinishedException.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAlreadyFinishedException.java
@@ -31,11 +31,15 @@ public class TransactionAlreadyFinishedException extends 
TransactionException {
     /** Stored transaction result. */
     private final TransactionResult transactionResult;
 
-    public TransactionAlreadyFinishedException(String message, 
TransactionResult transactionResult) {
-        super(TX_UNEXPECTED_STATE_ERR, message);
+    public TransactionAlreadyFinishedException(int errorCode, String message, 
TransactionResult transactionResult, Throwable cause) {
+        super(errorCode, message, cause);
         this.transactionResult = transactionResult;
     }
 
+    public TransactionAlreadyFinishedException(String message, 
TransactionResult transactionResult) {
+        this(TX_UNEXPECTED_STATE_ERR, message, transactionResult, null);
+    }
+
     public TransactionResult transactionResult() {
         return transactionResult;
     }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
index a684f6fc89..fdbbaffdfb 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
@@ -38,7 +38,7 @@ public class PrimaryReplicaExpiredException extends 
IgniteInternalException {
     public PrimaryReplicaExpiredException(
             ReplicationGroupId groupId,
             Long expectedEnlistmentConsistencyToken,
-            HybridTimestamp commitTimestamp,
+            @Nullable HybridTimestamp commitTimestamp,
             @Nullable ReplicaMeta currentPrimaryReplica
     ) {
         super(
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index cd8c65eb67..e40523e56d 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.tx.impl;
 
 import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
@@ -25,10 +26,12 @@ import static 
org.apache.ignite.internal.tx.TxState.COMMITTED;
 import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_PRIMARY_REPLICA_EXPIRED_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
 
 import java.io.IOException;
@@ -53,6 +56,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
+import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -60,10 +64,14 @@ import 
org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
 import 
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
 import org.apache.ignite.internal.replicator.message.ErrorReplicaResponse;
 import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
@@ -174,6 +182,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
      */
     private final TxMessageSender txMessageSender;
 
+    private final EventListener<PrimaryReplicaEventParameters> 
primaryReplicaEventListener;
+
     /**
      * The constructor.
      *
@@ -204,6 +214,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         this.idleSafeTimePropagationPeriodMsSupplier = 
idleSafeTimePropagationPeriodMsSupplier;
         this.topologyService = clusterService.topologyService();
         this.messagingService = clusterService.messagingService();
+        this.primaryReplicaEventListener = this::primaryReplicaEventListener;
 
         placementDriverHelper = new PlacementDriverHelper(placementDriver, 
clock);
 
@@ -229,6 +240,30 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         txCleanupRequestSender = new TxCleanupRequestSender(txMessageSender, 
placementDriverHelper, writeIntentSwitchProcessor);
     }
 
+    private CompletableFuture<Boolean> 
primaryReplicaEventListener(PrimaryReplicaEventParameters eventParameters, 
Throwable err) {
+        return inBusyLock(busyLock, () -> {
+            if (!(eventParameters.groupId() instanceof TablePartitionId)) {
+                return falseCompletedFuture();
+            }
+
+            TablePartitionId groupId = (TablePartitionId) 
eventParameters.groupId();
+
+            for (Map.Entry<UUID, TxContext> ctxEntry : txCtxMap.entrySet()) {
+                TxContext txContext = ctxEntry.getValue();
+
+                if (txContext.isTxFinishing()) {
+                    Long enlistmentConsistencyToken = 
txContext.enlistedGroups.get(groupId);
+
+                    if (enlistmentConsistencyToken != null) {
+                        txContext.cancelWaitingInflights(groupId, 
enlistmentConsistencyToken);
+                    }
+                }
+            }
+
+            return falseCompletedFuture();
+        });
+    }
+
     @Override
     public InternalTransaction begin(HybridTimestampTracker timestampTracker) {
         return begin(timestampTracker, false, TxPriority.NORMAL);
@@ -330,17 +365,19 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     public CompletableFuture<Void> finish(
             HybridTimestampTracker observableTimestampTracker,
             TablePartitionId commitPartition,
-            boolean commit,
+            boolean commitIntent,
             Map<TablePartitionId, Long> enlistedGroups,
             UUID txId
     ) {
-        LOG.debug("Finish [commit={}, txId={}, groups={}].", commit, txId, 
enlistedGroups);
+        LOG.debug("Finish [commit={}, txId={}, groups={}].", commitIntent, 
txId, enlistedGroups);
 
         assert enlistedGroups != null;
 
         if (enlistedGroups.isEmpty()) {
             // If there are no enlisted groups, just update local state - we 
already marked the tx as finished.
-            updateTxMeta(txId, old -> new TxStateMeta(commit ? COMMITTED : 
ABORTED, localNodeId, commitPartition, commitTimestamp(commit)));
+            updateTxMeta(txId, old -> new TxStateMeta(
+                    commitIntent ? COMMITTED : ABORTED, localNodeId, 
commitPartition, commitTimestamp(commitIntent)
+            ));
 
             return nullCompletedFuture();
         }
@@ -367,27 +404,17 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             // If the state is FINISHING then someone else hase in in the 
middle of finishing this tx.
             if (stateMeta.txState() == FINISHING) {
                 return ((TxStateMetaFinishing) stateMeta).txFinishFuture()
-                        .thenCompose(meta -> checkTxOutcome(commit, txId, 
meta));
+                        .thenCompose(meta -> checkTxOutcome(commitIntent, 
txId, meta));
             } else {
                 // The TX has already been finished. Check whether it finished 
with the same outcome.
-                return checkTxOutcome(commit, txId, stateMeta);
+                return checkTxOutcome(commitIntent, txId, stateMeta);
             }
         }
 
-        TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> {
-            if (tuple0 == null) {
-                tuple0 = new TxContext(); // No writes enlisted.
-            }
-
-            assert !tuple0.isTxFinishing() : "Transaction is already finished 
[id=" + uuid + "].";
-
-            tuple0.finishTx();
-
-            return tuple0;
-        });
+        TxContext txContext = lockTxForNewUpdates(txId, enlistedGroups, 
commitIntent);
 
         // Wait for commit acks first, then proceed with the finish request.
-        return tuple.performFinish(commit, ignored ->
+        return txContext.performFinish(commitIntent, commit ->
                 prepareFinish(
                         observableTimestampTracker,
                         commitPartition,
@@ -398,12 +425,26 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                 ));
     }
 
+    private TxContext lockTxForNewUpdates(UUID txId, Map<TablePartitionId, 
Long> enlistedGroups, boolean commitIntent) {
+        return txCtxMap.compute(txId, (uuid, tuple0) -> {
+            if (tuple0 == null) {
+                tuple0 = new TxContext(placementDriver); // No writes enlisted.
+            }
+
+            assert !tuple0.isTxFinishing() : "Transaction is already finished 
[id=" + uuid + "].";
+
+            tuple0.finishTx(enlistedGroups);
+
+            return tuple0;
+        });
+    }
+
     private static CompletableFuture<Void> checkTxOutcome(boolean commit, UUID 
txId, TransactionMeta stateMeta) {
         if ((stateMeta.txState() == COMMITTED) == commit) {
             return nullCompletedFuture();
         }
 
-        return CompletableFuture.failedFuture(new 
TransactionAlreadyFinishedException(
+        return failedFuture(new TransactionAlreadyFinishedException(
                 "Failed to change the outcome of a finished transaction 
[txId=" + txId + ", txState=" + stateMeta.txState() + "].",
                 new TransactionResult(stateMeta.txState(), 
stateMeta.commitTimestamp()))
         );
@@ -440,7 +481,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                                     txFinishFuture);
                         })
                 .thenCompose(Function.identity())
-                // verification future is added in order to share proper 
exception with the client
+                // Verification future is added in order to share the proper 
verification exception with the client.
                 .thenCompose(r -> verificationFuture);
     }
 
@@ -595,6 +636,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
         txCleanupRequestHandler.start();
 
+        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
primaryReplicaEventListener);
+
         return nullCompletedFuture();
     }
 
@@ -614,6 +657,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
         txCleanupRequestHandler.stop();
 
+        
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
primaryReplicaEventListener);
+
         shutdownAndAwaitTermination(cleanupExecutor, 10, TimeUnit.SECONDS);
     }
 
@@ -683,7 +728,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
         txCtxMap.compute(txId, (uuid, tuple) -> {
             if (tuple == null) {
-                tuple = new TxContext();
+                tuple = new TxContext(placementDriver);
             }
 
             if (tuple.isTxFinishing()) {
@@ -727,9 +772,9 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         }
 
         // Process directly sent response.
-        ReplicaResponse request = (ReplicaResponse) message;
+        ReplicaResponse response = (ReplicaResponse) message;
 
-        Object result = request.result();
+        Object result = response.result();
 
         if (result instanceof UUID) {
             removeInflight((UUID) result);
@@ -769,7 +814,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                                             "Commit timestamp is greater than 
primary replica expiration timestamp:"
                                                     + " [groupId = {}, commit 
timestamp = {}, primary replica expiration timestamp = {}]",
                                             groupId, commitTimestamp, 
currentPrimaryReplica.getExpirationTime());
-
                         }
                     });
         }
@@ -780,25 +824,63 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     private static class TxContext {
         volatile long inflights = 0; // Updated under lock.
         private final CompletableFuture<Void> waitRepFut = new 
CompletableFuture<>();
+        private final PlacementDriver placementDriver;
         volatile CompletableFuture<Void> finishInProgressFuture = null;
+        volatile Map<TablePartitionId, Long> enlistedGroups;
+
+        private TxContext(PlacementDriver placementDriver) {
+            this.placementDriver = placementDriver;
+        }
 
-        CompletableFuture<Void> performFinish(boolean commit, Function<Void, 
CompletableFuture<Void>> finishAction) {
+        CompletableFuture<Void> performFinish(boolean commit, 
Function<Boolean, CompletableFuture<Void>> finishAction) {
             waitReadyToFinish(commit)
-                    .thenCompose(finishAction)
-                    .handle((ignored, err) -> {
-                        if (err == null) {
-                            finishInProgressFuture.complete(null);
-                        } else {
-                            finishInProgressFuture.completeExceptionally(err);
-                        }
-                        return null;
-                    });
+                    .whenComplete((ignoredReadyToFinish, readyException) -> 
finishAction.apply(commit && readyException == null)
+                            .whenComplete((ignoredFinishActionResult, 
finishException) ->
+                                    completeFinishInProgressFuture(commit, 
readyException, finishException))
+                    );
 
             return finishInProgressFuture;
         }
 
+        private void completeFinishInProgressFuture(
+                boolean commit,
+                @Nullable Throwable readyToFinishException,
+                @Nullable Throwable finishException
+        ) {
+            if (readyToFinishException == null) {
+                if (finishException == null) {
+                    finishInProgressFuture.complete(null);
+                } else {
+                    
finishInProgressFuture.completeExceptionally(finishException);
+                }
+            } else {
+                if (commit && readyToFinishException instanceof 
PrimaryReplicaExpiredException) {
+                    finishInProgressFuture.completeExceptionally(new 
TransactionAlreadyFinishedException(
+                            TX_PRIMARY_REPLICA_EXPIRED_ERR,
+                            "Failed to commit the transaction.",
+                            new TransactionResult(ABORTED, null),
+                            readyToFinishException
+                    ));
+                } else {
+                    
finishInProgressFuture.completeExceptionally(readyToFinishException);
+                }
+            }
+        }
+
         private CompletableFuture<Void> waitReadyToFinish(boolean commit) {
-            return commit ? waitNoInflights() : nullCompletedFuture();
+            if (commit) {
+                for (Map.Entry<TablePartitionId, Long> e : 
enlistedGroups.entrySet()) {
+                    ReplicaMeta replicaMeta = 
placementDriver.currentLease(e.getKey());
+
+                    if (replicaMeta == null || 
!e.getValue().equals(replicaMeta.getStartTime().longValue())) {
+                        return failedFuture(new 
PrimaryReplicaExpiredException(e.getKey(), e.getValue(), null, replicaMeta));
+                    }
+                }
+
+                return waitNoInflights();
+            } else {
+                return nullCompletedFuture();
+            }
         }
 
         private CompletableFuture<Void> waitNoInflights() {
@@ -808,13 +890,18 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             return waitRepFut;
         }
 
+        private void cancelWaitingInflights(TablePartitionId groupId, Long 
enlistmentConsistencyToken) {
+            waitRepFut.completeExceptionally(new 
PrimaryReplicaExpiredException(groupId, enlistmentConsistencyToken, null, 
null));
+        }
+
         void onRemovedInflights() {
             if (inflights == 0 && finishInProgressFuture != null) {
                 waitRepFut.complete(null);
             }
         }
 
-        void finishTx() {
+        void finishTx(Map<TablePartitionId, Long> enlistedGroups) {
+            this.enlistedGroups = enlistedGroups;
             finishInProgressFuture = new CompletableFuture<>();
         }
 
@@ -832,6 +919,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         private static final Set<Class<? extends Throwable>> RECOVERABLE = 
Set.of(
                 TimeoutException.class,
                 IOException.class,
+                ReplicationException.class,
                 ReplicationTimeoutException.class,
                 PrimaryReplicaMissException.class
         );
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
index f32ef4e0a6..dc66dbec6c 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
@@ -288,6 +288,23 @@ public abstract class AbstractDeadlockPreventionTest 
extends AbstractLockingTest
         assertThat(futTx1, willSucceedFast());
     }
 
+    @Test
+    public void testIncompatibleLockRetry() {
+        var tx1 = beginTx();
+        var tx2 = beginTx();
+
+        var k = key("test");
+
+        assertThat(slock(tx1, k), willSucceedFast());
+        assertThat(slock(tx2, k), willSucceedFast());
+
+        assertFutureFailsOrWaitsForTimeout(() -> xlock(tx2, k));
+
+        commitTx(tx1);
+
+        assertThat(xlock(tx2, k), willSucceedFast());
+    }
+
     /**
      * This method checks lock future of conflicting transaction provided by 
supplier, in a way depending on deadlock prevention policy.
      * If the policy does not allow wait on conflict (wait timeout is equal to 
{@code 0}) then the future must be failed with
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 1614346449..287dde26cf 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -360,6 +360,8 @@ public class TxManagerTest extends IgniteAbstractTest {
         // Same primary that was enlisted is returned during finish phase and 
commitTimestamp is less that primary.expirationTimestamp.
         when(placementDriver.getPrimaryReplica(any(), 
any())).thenReturn(completedFuture(
                 new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), 
HybridTimestamp.MAX_VALUE)));
+        when(placementDriver.currentLease(any())).thenReturn(
+                new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), 
HybridTimestamp.MAX_VALUE));
         when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), 
any())).thenReturn(completedFuture(
                 new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), 
HybridTimestamp.MAX_VALUE)));
 
@@ -383,6 +385,8 @@ public class TxManagerTest extends IgniteAbstractTest {
                 .thenReturn(
                         completedFuture(new TestReplicaMetaImpl(LOCAL_NODE, 
hybridTimestamp(1), hybridTimestamp(10)))
                 );
+        when(placementDriver.currentLease(any()))
+                .thenReturn(new TestReplicaMetaImpl(LOCAL_NODE, 
hybridTimestamp(1), hybridTimestamp(10)));
         when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), 
any()))
                 .thenReturn(
                         completedFuture(new TestReplicaMetaImpl(LOCAL_NODE, 
hybridTimestamp(1), hybridTimestamp(10))),
@@ -420,7 +424,7 @@ public class TxManagerTest extends IgniteAbstractTest {
     @Test
     public void testFinishExpiredWithNullPrimary() {
         // Null is returned as primaryReplica during finish phase.
-        when(placementDriver.getPrimaryReplica(any(), 
any())).thenReturn(nullCompletedFuture());
+        when(placementDriver.currentLease(any())).thenReturn(null);
         when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), 
any())).thenReturn(completedFuture(
                 new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), 
hybridTimestamp(10))));
         when(replicaService.invoke(anyString(), 
any(TxFinishReplicaRequest.class)))
@@ -434,7 +438,7 @@ public class TxManagerTest extends IgniteAbstractTest {
     @Test
     public void testExpiredExceptionDoesNotShadeResponseExceptions() {
         // Null is returned as primaryReplica during finish phase.
-        when(placementDriver.getPrimaryReplica(any(), 
any())).thenReturn(nullCompletedFuture());
+        when(placementDriver.currentLease(any())).thenReturn(null);
         when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), 
any())).thenReturn(completedFuture(
                 new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), 
hybridTimestamp(10))));
         when(replicaService.invoke(anyString(), 
any(TxFinishReplicaRequest.class)))
@@ -469,6 +473,8 @@ public class TxManagerTest extends IgniteAbstractTest {
         when(placementDriver.getPrimaryReplica(eq(tablePartitionId1), any()))
                 .thenReturn(completedFuture(
                         new TestReplicaMetaImpl(LOCAL_NODE, 
hybridTimestamp(1), HybridTimestamp.MAX_VALUE)));
+        when(placementDriver.currentLease(eq(tablePartitionId1)))
+                .thenReturn(new TestReplicaMetaImpl(LOCAL_NODE, 
hybridTimestamp(1), HybridTimestamp.MAX_VALUE));
         when(placementDriver.awaitPrimaryReplica(eq(tablePartitionId1), any(), 
anyLong(), any()))
                 .thenReturn(completedFuture(
                         new TestReplicaMetaImpl(LOCAL_NODE, 
hybridTimestamp(1), HybridTimestamp.MAX_VALUE)));
@@ -499,6 +505,7 @@ public class TxManagerTest extends IgniteAbstractTest {
         // given test checks that an assertion exception will be thrown and 
wrapped with proper transaction public one.
         when(placementDriver.getPrimaryReplica(any(), 
any())).thenReturn(completedFuture(
                 new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), 
hybridTimestamp(10))));
+        when(placementDriver.currentLease(any())).thenReturn(new 
TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10)));
         when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), 
any())).thenReturn(completedFuture(
                 new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), 
hybridTimestamp(10))));
         when(replicaService.invoke(anyString(), 
any(TxFinishReplicaRequest.class)))
@@ -520,8 +527,8 @@ public class TxManagerTest extends IgniteAbstractTest {
     @Test
     public void testFinishExpiredWithDifferentEnlistmentConsistencyToken() {
         // Primary with another enlistment consistency token is returned.
-        when(placementDriver.getPrimaryReplica(any(), 
any())).thenReturn(completedFuture(
-                new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(2), 
HybridTimestamp.MAX_VALUE)));
+        when(placementDriver.currentLease(any())).thenReturn(
+                new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(2), 
HybridTimestamp.MAX_VALUE));
         when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), 
any())).thenReturn(completedFuture(
                 new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(2), 
HybridTimestamp.MAX_VALUE)));
         when(replicaService.invoke(anyString(), 
any(TxFinishReplicaRequest.class)))

Reply via email to