This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a26a6714d IGNITE-21213 Coordination of mechanisms of determination 
for primar (#3097)
7a26a6714d is described below

commit 7a26a6714d668968fe0b69ed3664672d035e01e7
Author: Cyrill <cyrill.si...@gmail.com>
AuthorDate: Tue Feb 6 16:28:54 2024 +0300

    IGNITE-21213 Coordination of mechanisms of determination for primar (#3097)
---
 .../apache/ignite/client/fakes/FakeTxManager.java  |   5 +-
 .../ItPrimaryReplicaChoiceTest.java                |   6 ++
 .../ignite/internal/replicator/ReplicaManager.java |  56 +---------
 .../replicator/listener/ReplicaListener.java       |  18 ----
 .../ignite/internal/table/ItColocationTest.java    |   3 +-
 .../internal/table/ItTransactionRecoveryTest.java  |   3 +-
 .../replicator/PartitionReplicaListener.java       | 120 +++------------------
 .../replication/PartitionReplicaListenerTest.java  |  11 +-
 .../org/apache/ignite/internal/tx/TxManager.java   |   9 +-
 .../internal/tx/impl/ReadWriteTransactionImpl.java |  10 +-
 .../internal/tx/impl/TxCleanupRequestSender.java   |  21 +++-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  55 ++++++----
 .../ignite/internal/tx/impl/TxMessageSender.java   |   3 +-
 .../tx/message/TxFinishReplicaRequest.java         |   4 +-
 .../apache/ignite/internal/tx/TxCleanupTest.java   |  38 +++++--
 .../apache/ignite/internal/tx/TxManagerTest.java   |  24 ++---
 16 files changed, 135 insertions(+), 251 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index aeafe657c6..72e1615f8e 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.client.fakes;
 
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
-import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -186,7 +185,7 @@ public class FakeTxManager implements TxManager {
             HybridTimestampTracker timestampTracker,
             TablePartitionId commitPartition,
             boolean commit,
-            Map<TablePartitionId, Long> enlistedGroups,
+            Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> 
enlistedGroups,
             UUID txId
     ) {
         return nullCompletedFuture();
@@ -194,7 +193,7 @@ public class FakeTxManager implements TxManager {
 
     @Override
     public CompletableFuture<Void> cleanup(
-            Collection<TablePartitionId> partitions,
+            Map<TablePartitionId, String> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
index f8d8e7b54f..40e8e00455 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
@@ -246,6 +246,12 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
 
         NodeUtils.transferPrimary(tbl, null, this::node);
 
+        
assertTrue(ignite.txManager().lockManager().locks(rwTx.id()).hasNext());
+        assertEquals(6, partitionStorage.pendingCursors() + 
hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
+
+        rwTx.rollback();
+
+
         
assertFalse(ignite.txManager().lockManager().locks(rwTx.id()).hasNext());
         assertEquals(3, partitionStorage.pendingCursors() + 
hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
     }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index ba75082063..1f088fe3c2 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -21,8 +21,6 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
 import static 
org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
-import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
-import static 
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -57,8 +55,6 @@ import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessageHandler;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
-import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
@@ -232,9 +228,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 new LinkedBlockingQueue<>(),
                 NamedThreadFactory.create(nodeName, "replica", LOG)
         );
-
-        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
this::onPrimaryReplicaElected);
-        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
this::onPrimaryReplicaExpired);
     }
 
     private void onReplicaMessageReceived(NetworkMessage message, String 
senderConsistentId, @Nullable Long correlationId) {
@@ -532,6 +525,10 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 .thenCompose(v -> replicaFuture);
     }
 
+    private static boolean isCompletedSuccessfully(CompletableFuture<?> 
future) {
+        return future.isDone() && !future.isCompletedExceptionally();
+    }
+
     /**
      * Stops a replica by the partition group id.
      *
@@ -750,51 +747,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         }
     }
 
-
-    /**
-     * Event handler for {@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED}. 
Propagates execution to the
-     *      {@link 
ReplicaListener#onPrimaryElected(PrimaryReplicaEventParameters, Throwable)} of 
the replica, that corresponds
-     *      to a given {@link PrimaryReplicaEventParameters#groupId()}.
-     */
-    private CompletableFuture<Boolean> onPrimaryReplicaElected(
-            PrimaryReplicaEventParameters primaryReplicaEventParameters,
-            Throwable throwable
-    ) {
-        CompletableFuture<Replica> replica = 
replicas.get(primaryReplicaEventParameters.groupId());
-
-        if (replica == null) {
-            return falseCompletedFuture();
-        }
-
-        if (replica.isDone() && !replica.isCompletedExceptionally()) {
-            return 
replica.join().replicaListener().onPrimaryElected(primaryReplicaEventParameters,
 throwable);
-        } else {
-            return replica.thenCompose(r -> 
r.replicaListener().onPrimaryElected(primaryReplicaEventParameters, throwable));
-        }
-    }
-
-    /**
-     * Event handler for {@link PrimaryReplicaEvent#PRIMARY_REPLICA_EXPIRED}. 
Propagates execution to the
-     *      {@link 
ReplicaListener#onPrimaryExpired(PrimaryReplicaEventParameters, Throwable)} of 
the replica, that corresponds
-     *      to a given {@link PrimaryReplicaEventParameters#groupId()}.
-     */
-    private CompletableFuture<Boolean> onPrimaryReplicaExpired(
-            PrimaryReplicaEventParameters primaryReplicaEventParameters,
-            Throwable throwable
-    ) {
-        CompletableFuture<Replica> replica = 
replicas.get(primaryReplicaEventParameters.groupId());
-
-        if (replica == null) {
-            return falseCompletedFuture();
-        }
-
-        if (replica.isDone() && !replica.isCompletedExceptionally()) {
-            return 
replica.join().replicaListener().onPrimaryExpired(primaryReplicaEventParameters,
 throwable);
-        } else {
-            return replica.thenCompose(r -> 
r.replicaListener().onPrimaryExpired(primaryReplicaEventParameters, throwable));
-        }
-    }
-
     /**
      * Idle safe time sync for replicas.
      */
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
index eeee59b449..88a1937e97 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
@@ -18,12 +18,8 @@
 package org.apache.ignite.internal.replicator.listener;
 
 import java.util.concurrent.CompletableFuture;
-import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
-import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.util.CompletableFutures;
-import org.jetbrains.annotations.Nullable;
 
 /** Replica listener. */
 @FunctionalInterface
@@ -37,20 +33,6 @@ public interface ReplicaListener {
      */
     CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String 
senderId);
 
-    /**
-     * Invoked by {@link ReplicaManager} when current replica is elected as 
primary.
-     */
-    default CompletableFuture<Boolean> 
onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable 
exception) {
-        return CompletableFutures.falseCompletedFuture();
-    }
-
-    /**
-     * Invoked by {@link ReplicaManager} then current replica stops being a 
primary replica.
-     */
-    default CompletableFuture<Boolean> 
onPrimaryExpired(PrimaryReplicaEventParameters evt, @Nullable Throwable 
exception) {
-        return CompletableFutures.falseCompletedFuture();
-    }
-
     /** Callback on replica shutdown. */
     default void onShutdown() {
         // No-op.
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index ee58ccc2fa..d65d494523 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -60,6 +60,7 @@ import java.util.stream.Stream;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
@@ -176,7 +177,7 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                     HybridTimestampTracker observableTimestampTracker,
                     TablePartitionId commitPartition,
                     boolean commitIntent,
-                    Map<TablePartitionId, Long> enlistedGroups,
+                    Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> 
enlistedGroups,
                     UUID txId
             ) {
                 return nullCompletedFuture();
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index bf12690805..7236d54ee5 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -42,6 +42,7 @@ import java.util.stream.IntStream;
 import org.apache.ignite.InitParametersBuilder;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.DefaultMessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
@@ -614,7 +615,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
                 new HybridTimestampTracker(),
                 ((InternalTransaction) rwTx1).commitPartition(),
                 false,
-                Map.of(((InternalTransaction) rwTx1).commitPartition(), 0L),
+                Map.of(((InternalTransaction) rwTx1).commitPartition(), new 
IgniteBiTuple<>(txCrdNode2.node(), 0L)),
                 rwTx1Id
         );
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 6b83d12eae..ed0d733ee8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -47,7 +47,6 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
-import static org.apache.ignite.raft.jraft.util.internal.ThrowUtil.hasCause;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -84,12 +83,10 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgniteTriFunction;
 import org.apache.ignite.internal.lang.IgniteUuid;
-import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.lang.SafeTimeReorderException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.service.RaftCommandRunner;
 import org.apache.ignite.internal.replicator.ReplicaResult;
@@ -360,79 +357,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         prepareIndexBuilderTxRwOperationTracker();
     }
 
-    private CompletableFuture<?> durableCleanup(UUID txId, TxMeta txMeta) {
-        Collection<TablePartitionId> enlistedPartitions = 
txMeta.enlistedPartitions();
-
-        boolean commit = txMeta.txState() == COMMITTED;
-
-        HybridTimestamp commitTimestamp = txMeta.commitTimestamp();
-
-        return txManager.cleanup(enlistedPartitions, commit, commitTimestamp, 
txId)
-                .handle((v, e) -> {
-                    if (e != null) {
-                        LOG.warn("Failed to execute cleanup on commit 
partition primary replica switch [txId={}, commitPartition={}]",
-                                e, txId, replicationGroupId);
-
-                        if (hasCause(e, null, NodeStoppingException.class)) {
-                            return nullCompletedFuture();
-                        } else {
-                            return txManager.executeCleanupAsync(() -> 
durableCleanup(txId, txMeta));
-                        }
-                    } else {
-                        return nullCompletedFuture();
-                    }
-                })
-                .thenCompose(f -> f);
-    }
-
-    @Override
-    public CompletableFuture<Boolean> 
onPrimaryExpired(PrimaryReplicaEventParameters evt, @Nullable Throwable 
exception) {
-        assert replicationGroupId.equals(evt.groupId()) : format(
-                "The replication group listener does not match the event 
[grp={}, eventGrp={}]",
-                replicationGroupId,
-                evt.groupId()
-        );
-
-        if (!localNode.id().equals(evt.leaseholderId())) {
-            return falseCompletedFuture();
-        }
-
-        LOG.info("Primary replica expired [grp={}]", replicationGroupId);
-
-        ArrayList<CompletableFuture<?>> futs = new ArrayList<>();
-
-        for (UUID txId : txCleanupReadyFutures.keySet()) {
-            txCleanupReadyFutures.compute(txId, (id, txOps) -> {
-                if (txOps == null) {
-                    return null;
-                }
-
-                if (!txOps.futures.isEmpty()) {
-                    CompletableFuture<?>[] txFuts = 
txOps.futures.values().stream()
-                            .flatMap(Collection::stream)
-                            .toArray(CompletableFuture[]::new);
-
-                    futs.add(allOf(txFuts).whenComplete((unused, throwable) -> 
{
-                        releaseTxLocks(txId);
-
-                        try {
-                            closeAllTransactionCursors(txId);
-                        } catch (Exception e) {
-                            LOG.warn("Unable to clear resource for transaction 
on primary replica expiration [tx={}, replicationGrp={}]", e,
-                                    txId, replicationGroupId);
-                        }
-                    }));
-
-                    txOps.futures.clear();
-                }
-
-                return txOps;
-            });
-        }
-
-        return allOf(futs.toArray(CompletableFuture[]::new)).thenApply(unused 
-> false);
-    }
-
     @Override
     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
String senderId) {
         return ensureReplicaIsPrimary(request)
@@ -490,10 +414,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         // Check whether a transaction has already been finished.
         if (txMeta != null && isFinalState(txMeta.txState())) {
-            return recoverFinishedTx(txId, txMeta)
-                    // If the sender has sent a recovery message, it failed to 
handle it on its own,
-                    // so sending cleanup to the sender for the transaction we 
know is finished.
-                    .whenComplete((v, ex) -> runCleanupOnNode(txId, senderId));
+            return runCleanupOnNode(txId, senderId);
         }
 
         LOG.info("Orphan transaction has to be aborted [tx={}, meta={}].", 
txId, txMeta);
@@ -501,16 +422,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         return triggerTxRecovery(txId, senderId);
     }
 
-    private CompletableFuture<Void> recoverFinishedTx(UUID txId, TxMeta 
txMeta) {
-        if (txMeta.enlistedPartitions().isEmpty()) {
-            // Nothing to do if there are no enlistedPartitions available.
-            return nullCompletedFuture();
-        }
-
-        // Otherwise run a cleanup on the known set of partitions.
-        return (CompletableFuture<Void>) durableCleanup(txId, txMeta);
-    }
-
     /**
      * Run cleanup on a node.
      *
@@ -538,7 +449,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         new HybridTimestampTracker(),
                         replicationGroupId,
                         false,
-                        Map.of(replicationGroupId, 0L), // term is not 
required for the rollback.
+                        // term is not required for the rollback.
+                        Map.of(replicationGroupId, new 
IgniteBiTuple<>(clusterNodeResolver.getById(senderId), 0L)),
                         txId
                 )
                 .whenComplete((v, ex) -> runCleanupOnNode(txId, senderId));
@@ -1552,14 +1464,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     // TODO: need to properly handle primary replica changes 
https://issues.apache.org/jira/browse/IGNITE-17615
     private CompletableFuture<TransactionResult> 
processTxFinishAction(TxFinishReplicaRequest request) {
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use 
ZonePartitionIdMessage and remove cast
-        Collection<TablePartitionId> enlistedGroups = 
(Collection<TablePartitionId>) (Collection<?>) request.groups();
+        Map<TablePartitionId, String> enlistedGroups = (Map<TablePartitionId, 
String>) (Map<?, ?>) request.groups();
 
         UUID txId = request.txId();
 
         if (request.commit()) {
             HybridTimestamp commitTimestamp = request.commitTimestamp();
 
-            return schemaCompatValidator.validateCommit(txId, enlistedGroups, 
commitTimestamp)
+            return schemaCompatValidator.validateCommit(txId, 
enlistedGroups.keySet(), commitTimestamp)
                     .thenCompose(validationResult ->
                             finishAndCleanup(
                                     enlistedGroups,
@@ -1597,7 +1509,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     }
 
     private CompletableFuture<TransactionResult> finishAndCleanup(
-            Collection<TablePartitionId> enlistedPartitions,
+            Map<TablePartitionId, String> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
@@ -1648,7 +1560,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return completedFuture(new TransactionResult(txMeta.txState(), 
txMeta.commitTimestamp()));
         }
 
-        return finishTransaction(enlistedPartitions, txId, commit, 
commitTimestamp)
+        return finishTransaction(enlistedPartitions.keySet(), txId, commit, 
commitTimestamp)
                 .thenCompose(txResult ->
                         txManager.cleanup(enlistedPartitions, commit, 
commitTimestamp, txId)
                                 .thenApply(v -> txResult)
@@ -1748,20 +1660,18 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param request Transaction cleanup request.
      * @return CompletableFuture of void.
      */
-    // TODO: need to properly handle primary replica changes 
https://issues.apache.org/jira/browse/IGNITE-17615
     private CompletableFuture<Void> 
processWriteIntentSwitchAction(WriteIntentSwitchReplicaRequest request) {
-        try {
-            closeAllTransactionCursors(request.txId());
-        } catch (Exception e) {
-            return failedFuture(e);
-        }
-
-        TxState txState = request.commit() ? COMMITTED : ABORTED;
-
-        markFinished(request.txId(), txState, request.commitTimestamp());
+        markFinished(request.txId(), request.commit() ? COMMITTED : ABORTED, 
request.commitTimestamp());
 
         return awaitCleanupReadyFutures(request.txId(), request.commit())
                 .thenCompose(res -> {
+                    try {
+                        closeAllTransactionCursors(request.txId());
+                    } catch (Exception e) {
+                        // TODO: IGNITE-21293 Should we stop write intent 
switch handling if closing cursors failed?
+                        return failedFuture(e);
+                    }
+
                     if (res.hadUpdateFutures()) {
                         HybridTimestamp commandTimestamp = hybridClock.now();
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 01617fdb25..cc60c1e039 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -1589,7 +1589,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         TxFinishReplicaRequest commitRequest = 
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
                 .groupId(grpId)
                 .txId(txId)
-                .groups(Set.of(grpId))
+                .groups(Map.of(grpId, localNode.name()))
                 .commit(false)
                 .enlistmentConsistencyToken(1L)
                 .build();
@@ -1653,7 +1653,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         TxFinishReplicaRequest commitRequest = 
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
                 .groupId(grpId)
                 .txId(txId)
-                .groups(Set.of(grpId))
+                .groups(Map.of(grpId, localNode.name()))
                 .commit(true)
                 .commitTimestampLong(hybridTimestampToLong(commitTimestamp))
                 .enlistmentConsistencyToken(1L)
@@ -2350,19 +2350,20 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
     @Test
     void commitRequestFailsIfCommitPartitionTableWasDropped() {
-        testCommitRequestIfTableWasDropped(grpId, Set.of(grpId), 
grpId.tableId());
+        testCommitRequestIfTableWasDropped(grpId,  Map.of(grpId, 
localNode.name()), grpId.tableId());
     }
 
     @Test
     void commitRequestFailsIfNonCommitPartitionTableWasDropped() {
         TablePartitionId anotherPartitionId = new 
TablePartitionId(ANOTHER_TABLE_ID, 0);
 
-        testCommitRequestIfTableWasDropped(grpId, Set.of(grpId, 
anotherPartitionId), anotherPartitionId.tableId());
+        testCommitRequestIfTableWasDropped(grpId, Map.of(grpId, 
localNode.name(), anotherPartitionId, localNode.name()),
+                anotherPartitionId.tableId());
     }
 
     private void testCommitRequestIfTableWasDropped(
             TablePartitionId commitPartitionId,
-            Set<ReplicationGroupId> groups,
+            Map<ReplicationGroupId, String> groups,
             int tableToBeDroppedId
     ) {
         when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), 
any(), any(HybridTimestamp.class)))
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 1e68605b50..39c10641c7 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -17,17 +17,18 @@
 
 package org.apache.ignite.internal.tx;
 
-import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -143,21 +144,21 @@ public interface TxManager extends IgniteComponent {
             HybridTimestampTracker timestampTracker,
             TablePartitionId commitPartition,
             boolean commit,
-            Map<TablePartitionId, Long> enlistedGroups,
+            Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> 
enlistedGroups,
             UUID txId
     );
 
     /**
      * Sends cleanup request to the cluster nodes that hosts primary replicas 
for the enlisted partitions.
      *
-     * @param partitions Enlisted partition groups.
+     * @param enlistedPartitions Enlisted partition groups.
      * @param commit {@code true} if a commit requested.
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
      * @param txId Transaction id.
      * @return Completable future of Void.
      */
     CompletableFuture<Void> cleanup(
-            Collection<TablePartitionId> partitions,
+            Map<TablePartitionId, String> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 6a366dd332..91f3b9bae0 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -23,13 +23,11 @@ import static 
org.apache.ignite.internal.tx.TxState.isFinalState;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
 
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -172,13 +170,7 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
      * @return The future of transaction completion.
      */
     private CompletableFuture<Void> finishInternal(boolean commit) {
-        Map<TablePartitionId, Long> enlistedGroups = 
enlisted.entrySet().stream()
-                .collect(Collectors.toMap(
-                        Entry::getKey,
-                        entry -> entry.getValue().get2()
-                ));
-
-        return txManager.finish(observableTsTracker, commitPart, commit, 
enlistedGroups, id());
+        return txManager.finish(observableTsTracker, commitPart, commit, 
enlisted, id());
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index b8f2468cff..9849f647d8 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -21,6 +21,8 @@ import static java.util.concurrent.CompletableFuture.allOf;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -77,19 +79,32 @@ public class TxCleanupRequestSender {
     /**
      * Sends cleanup request to the primary nodes of each one of {@code 
partitions}.
      *
-     * @param partitions Enlisted partition groups.
+     * @param enlistedPartitions Map of enlisted partition group to the 
initial primary node.
      * @param commit {@code true} if a commit requested.
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
      * @param txId Transaction id.
      * @return Completable future of Void.
      */
     public CompletableFuture<Void> cleanup(
-            Collection<TablePartitionId> partitions,
+            Map<TablePartitionId, String> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
     ) {
-        return placementDriverHelper.findPrimaryReplicas(partitions)
+        Map<String, Set<TablePartitionId>> partitions = new HashMap<>();
+        enlistedPartitions.forEach((partitionId, nodeId) ->
+                partitions.computeIfAbsent(nodeId, node -> new 
HashSet<>()).add(partitionId));
+
+        return cleanupPartitions(partitions, commit, commitTimestamp, txId);
+    }
+
+    private CompletableFuture<Void> cleanup(
+            Collection<TablePartitionId> partitionIds,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId
+    ) {
+        return placementDriverHelper.findPrimaryReplicas(partitionIds)
                 .thenCompose(partitionData -> {
                     switchWriteIntentsOnPartitions(commit, commitTimestamp, 
txId, partitionData.partitionsWithoutPrimary);
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index cadd9a1164..757a7eaac5 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -36,11 +36,10 @@ import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_PRIMARY_REPLICA
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -57,9 +56,11 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -98,6 +99,7 @@ import 
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.TopologyService;
 import org.jetbrains.annotations.Nullable;
 
@@ -254,10 +256,10 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                 TxContext txContext = ctxEntry.getValue();
 
                 if (txContext.isTxFinishing()) {
-                    Long enlistmentConsistencyToken = 
txContext.enlistedGroups.get(groupId);
+                    IgniteBiTuple<ClusterNode, Long> nodeAndToken = 
txContext.enlistedGroups.get(groupId);
 
-                    if (enlistmentConsistencyToken != null) {
-                        txContext.cancelWaitingInflights(groupId, 
enlistmentConsistencyToken);
+                    if (nodeAndToken != null) {
+                        txContext.cancelWaitingInflights(groupId, 
nodeAndToken.get2());
                     }
                 }
             }
@@ -370,7 +372,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             HybridTimestampTracker observableTimestampTracker,
             TablePartitionId commitPartition,
             boolean commitIntent,
-            Map<TablePartitionId, Long> enlistedGroups,
+            Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> 
enlistedGroups,
             UUID txId
     ) {
         LOG.debug("Finish [commit={}, txId={}, groups={}].", commitIntent, 
txId, enlistedGroups);
@@ -417,7 +419,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             }
         }
 
-        TxContext txContext = lockTxForNewUpdates(txId, enlistedGroups, 
commitIntent);
+        TxContext txContext = lockTxForNewUpdates(txId, enlistedGroups);
 
         // Wait for commit acks first, then proceed with the finish request.
         return txContext.performFinish(commitIntent, commit ->
@@ -436,7 +438,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         });
     }
 
-    private TxContext lockTxForNewUpdates(UUID txId, Map<TablePartitionId, 
Long> enlistedGroups, boolean commitIntent) {
+    private TxContext lockTxForNewUpdates(UUID txId, Map<TablePartitionId, 
IgniteBiTuple<ClusterNode, Long>> enlistedGroups) {
         return txCtxMap.compute(txId, (uuid, tuple0) -> {
             if (tuple0 == null) {
                 tuple0 = new TxContext(placementDriver); // No writes enlisted.
@@ -465,7 +467,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             HybridTimestampTracker observableTimestampTracker,
             TablePartitionId commitPartition,
             boolean commit,
-            Map<TablePartitionId, Long> enlistedGroups,
+            Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> 
enlistedGroups,
             UUID txId,
             CompletableFuture<TransactionMeta> txFinishFuture
     ) {
@@ -480,7 +482,11 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                         (unused, throwable) -> {
                             boolean verifiedCommit = throwable == null && 
commit;
 
-                            Collection<ReplicationGroupId> replicationGroupIds 
= new HashSet<>(enlistedGroups.keySet());
+                            Map<ReplicationGroupId, String> 
replicationGroupIds = enlistedGroups.entrySet().stream()
+                                    .collect(Collectors.toMap(
+                                            Entry::getKey,
+                                            entry -> 
entry.getValue().get1().name()
+                                    ));
 
                             return durableFinish(
                                     observableTimestampTracker,
@@ -503,7 +509,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             HybridTimestampTracker observableTimestampTracker,
             TablePartitionId commitPartition,
             boolean commit,
-            Collection<ReplicationGroupId> replicationGroupIds,
+            Map<ReplicationGroupId, String> replicationGroupIds,
             UUID txId,
             HybridTimestamp commitTimestamp,
             CompletableFuture<TransactionMeta> txFinishFuture
@@ -574,7 +580,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             String primaryConsistentId,
             Long enlistmentConsistencyToken,
             boolean commit,
-            Collection<ReplicationGroupId> replicationGroupIds,
+            Map<ReplicationGroupId, String> replicationGroupIds,
             UUID txId,
             HybridTimestamp commitTimestamp,
             CompletableFuture<TransactionMeta> txFinishFuture
@@ -688,12 +694,12 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
     @Override
     public CompletableFuture<Void> cleanup(
-            Collection<TablePartitionId> partitions,
+            Map<TablePartitionId, String> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
     ) {
-        return txCleanupRequestSender.cleanup(partitions, commit, 
commitTimestamp, txId);
+        return txCleanupRequestSender.cleanup(enlistedPartitions, commit, 
commitTimestamp, txId);
     }
 
     @Override
@@ -808,13 +814,16 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
      * @param commitTimestamp Commit timestamp.
      * @return Verification future.
      */
-    private CompletableFuture<Void> 
verifyCommitTimestamp(Map<TablePartitionId, Long> enlistedGroups, 
HybridTimestamp commitTimestamp) {
+    private CompletableFuture<Void> verifyCommitTimestamp(
+            Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> 
enlistedGroups,
+            HybridTimestamp commitTimestamp
+    ) {
         var verificationFutures = new CompletableFuture[enlistedGroups.size()];
         int cnt = -1;
 
-        for (Map.Entry<TablePartitionId, Long> enlistedGroup : 
enlistedGroups.entrySet()) {
+        for (Map.Entry<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> 
enlistedGroup : enlistedGroups.entrySet()) {
             TablePartitionId groupId = enlistedGroup.getKey();
-            Long expectedEnlistmentConsistencyToken = enlistedGroup.getValue();
+            Long expectedEnlistmentConsistencyToken = 
enlistedGroup.getValue().get2();
 
             verificationFutures[++cnt] = 
placementDriver.getPrimaryReplica(groupId, commitTimestamp)
                     .thenAccept(currentPrimaryReplica -> {
@@ -845,7 +854,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         private final CompletableFuture<Void> waitRepFut = new 
CompletableFuture<>();
         private final PlacementDriver placementDriver;
         volatile CompletableFuture<Void> finishInProgressFuture = null;
-        volatile Map<TablePartitionId, Long> enlistedGroups;
+        volatile Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> 
enlistedGroups;
 
         private TxContext(PlacementDriver placementDriver) {
             this.placementDriver = placementDriver;
@@ -888,11 +897,13 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
         private CompletableFuture<Void> waitReadyToFinish(boolean commit) {
             if (commit) {
-                for (Map.Entry<TablePartitionId, Long> e : 
enlistedGroups.entrySet()) {
+                for (Map.Entry<TablePartitionId, IgniteBiTuple<ClusterNode, 
Long>> e : enlistedGroups.entrySet()) {
                     ReplicaMeta replicaMeta = 
placementDriver.currentLease(e.getKey());
 
-                    if (replicaMeta == null || 
!e.getValue().equals(replicaMeta.getStartTime().longValue())) {
-                        return failedFuture(new 
PrimaryReplicaExpiredException(e.getKey(), e.getValue(), null, replicaMeta));
+                    Long enlistmentConsistencyToken = e.getValue().get2();
+
+                    if (replicaMeta == null || 
!enlistmentConsistencyToken.equals(replicaMeta.getStartTime().longValue())) {
+                        return failedFuture(new 
PrimaryReplicaExpiredException(e.getKey(), enlistmentConsistencyToken, null, 
replicaMeta));
                     }
                 }
 
@@ -919,7 +930,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             }
         }
 
-        void finishTx(Map<TablePartitionId, Long> enlistedGroups) {
+        void finishTx(Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> 
enlistedGroups) {
             this.enlistedGroups = enlistedGroups;
             finishInProgressFuture = new CompletableFuture<>();
         }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index 08c494b014..16b750a857 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.tx.impl;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -140,7 +141,7 @@ public class TxMessageSender {
     public CompletableFuture<TransactionResult> finish(
             String primaryConsistentId,
             TablePartitionId commitPartition,
-            Collection<ReplicationGroupId> replicationGroupIds,
+            Map<ReplicationGroupId, String> replicationGroupIds,
             UUID txId,
             Long consistencyToken,
             boolean commit,
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
index 59dc9fef57..a663f1e8a5 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.tx.message;
 
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
 
-import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.network.annotations.Marshallable;
@@ -72,5 +72,5 @@ public interface TxFinishReplicaRequest extends 
PrimaryReplicaRequest, Timestamp
      * @return Enlisted partition groups aggregated by expected primary 
replica nodes.
      */
     @Marshallable
-    Collection<ReplicationGroupId> groups();
+    Map<ReplicationGroupId, String> groups();
 }
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
index 29d9d465ea..7580c7edc1 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.tx;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -35,7 +36,8 @@ import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import java.util.Set;
+import java.io.IOException;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -115,15 +117,15 @@ public class TxCleanupTest extends IgniteAbstractTest {
     }
 
     @Test
-    void testPrimaryFoundForAllPartitions() {
+    void testCleanupAllNodes() {
         TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0);
         TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0);
         TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0);
 
-        Set<TablePartitionId> partitions = Set.of(tablePartitionId1, 
tablePartitionId2, tablePartitionId3);
-
-        when(placementDriver.getPrimaryReplica(any(), any()))
-                .thenReturn(completedFuture(new 
TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), 
HybridTimestamp.MAX_VALUE)));
+        Map<TablePartitionId, String> partitions = Map.of(
+                tablePartitionId1, LOCAL_NODE.name(),
+                tablePartitionId2, LOCAL_NODE.name(),
+                tablePartitionId3, LOCAL_NODE.name());
 
         HybridTimestamp beginTimestamp = clock.now();
         UUID txId = idGenerator.transactionIdFor(beginTimestamp);
@@ -140,12 +142,19 @@ public class TxCleanupTest extends IgniteAbstractTest {
     }
 
     @Test
-    void testPrimaryNotFoundForSome() {
+    void testPrimaryNotFoundForSomeAfterException() {
         TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0);
         TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0);
         TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0);
 
-        Set<TablePartitionId> partitions = Set.of(tablePartitionId1, 
tablePartitionId2, tablePartitionId3);
+        Map<TablePartitionId, String> partitions = Map.of(
+                tablePartitionId1, LOCAL_NODE.name(),
+                tablePartitionId2, LOCAL_NODE.name(),
+                tablePartitionId3, LOCAL_NODE.name());
+
+        // First cleanup fails:
+        when(messagingService.invoke(anyString(), any(), anyLong()))
+                .thenReturn(failedFuture(new IOException("Test failure")), 
nullCompletedFuture());
 
         when(placementDriver.getPrimaryReplica(any(), any()))
                 .thenReturn(completedFuture(new 
TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), 
HybridTimestamp.MAX_VALUE)));
@@ -165,7 +174,7 @@ public class TxCleanupTest extends IgniteAbstractTest {
         assertThat(cleanup, willCompleteSuccessfully());
 
         verify(txMessageSender, times(1)).switchWriteIntents(any(), any(), 
any(), anyBoolean(), any());
-        verify(txMessageSender, times(1)).cleanup(any(), any(), any(), 
anyBoolean(), any());
+        verify(txMessageSender, times(2)).cleanup(any(), any(), any(), 
anyBoolean(), any());
         verifyNoMoreInteractions(txMessageSender);
     }
 
@@ -175,7 +184,14 @@ public class TxCleanupTest extends IgniteAbstractTest {
         TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0);
         TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0);
 
-        Set<TablePartitionId> partitions = Set.of(tablePartitionId1, 
tablePartitionId2, tablePartitionId3);
+        Map<TablePartitionId, String> partitions = Map.of(
+                tablePartitionId1, LOCAL_NODE.name(),
+                tablePartitionId2, LOCAL_NODE.name(),
+                tablePartitionId3, LOCAL_NODE.name());
+
+        // First cleanup fails:
+        when(messagingService.invoke(anyString(), any(), anyLong()))
+                .thenReturn(failedFuture(new IOException("Test failure")), 
nullCompletedFuture());
 
         when(placementDriver.getPrimaryReplica(any(), any()))
                 .thenReturn(nullCompletedFuture());
@@ -193,6 +209,8 @@ public class TxCleanupTest extends IgniteAbstractTest {
         assertThat(cleanup, willCompleteSuccessfully());
 
         verify(txMessageSender, times(3)).switchWriteIntents(any(), any(), 
any(), anyBoolean(), any());
+        verify(txMessageSender, times(1)).cleanup(any(), any(), any(), 
anyBoolean(), any());
+
         verifyNoMoreInteractions(txMessageSender);
     }
 }
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 8ee29bfb99..22202edcfd 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -106,6 +106,9 @@ import org.mockito.verification.VerificationMode;
 public class TxManagerTest extends IgniteAbstractTest {
     private static final ClusterNode LOCAL_NODE = new 
ClusterNodeImpl("local_id", "local", new NetworkAddress("127.0.0.1", 2004), 
null);
 
+    private static final ClusterNode REMOTE_NODE =
+            new ClusterNodeImpl("remote_id", "remote", new 
NetworkAddress("127.1.1.1", 2024), null);
+
     private HybridTimestampTracker hybridTimestampTracker = new 
HybridTimestampTracker();
 
     private final LongSupplier idleSafeTimePropagationPeriodMsSupplier = () -> 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
@@ -267,11 +270,9 @@ public class TxManagerTest extends IgniteAbstractTest {
 
         InternalTransaction tx = txManager.begin(hybridTimestampTracker);
 
-        ClusterNode node = mock(ClusterNode.class);
-
         TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0);
 
-        tx.enlist(tablePartitionId1, new IgniteBiTuple<>(node, 1L));
+        tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L));
         tx.assignCommitPartition(tablePartitionId1);
 
         tx.commit();
@@ -290,11 +291,9 @@ public class TxManagerTest extends IgniteAbstractTest {
 
         InternalTransaction tx = txManager.begin(hybridTimestampTracker);
 
-        ClusterNode node = mock(ClusterNode.class);
-
         TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0);
 
-        tx.enlist(tablePartitionId1, new IgniteBiTuple<>(node, 1L));
+        tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L));
         tx.assignCommitPartition(tablePartitionId1);
 
         tx.rollback();
@@ -318,11 +317,9 @@ public class TxManagerTest extends IgniteAbstractTest {
 
         InternalTransaction tx = txManager.begin(hybridTimestampTracker);
 
-        ClusterNode node = mock(ClusterNode.class);
-
         TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0);
 
-        tx.enlist(tablePartitionId1, new IgniteBiTuple<>(node, 1L));
+        tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L));
         tx.assignCommitPartition(tablePartitionId1);
 
         TransactionException transactionException = 
assertThrows(TransactionException.class, tx::commit);
@@ -348,11 +345,9 @@ public class TxManagerTest extends IgniteAbstractTest {
 
         InternalTransaction tx = txManager.begin(hybridTimestampTracker);
 
-        ClusterNode node = mock(ClusterNode.class);
-
         TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0);
 
-        tx.enlist(tablePartitionId1, new IgniteBiTuple<>(node, 1L));
+        tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L));
         tx.assignCommitPartition(tablePartitionId1);
 
         TransactionException transactionException = 
assertThrows(TransactionException.class, tx::rollback);
@@ -737,10 +732,9 @@ public class TxManagerTest extends IgniteAbstractTest {
     private InternalTransaction prepareTransaction() {
         InternalTransaction tx = txManager.begin(hybridTimestampTracker);
 
-        ClusterNode node = mock(ClusterNode.class);
-
         TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0);
-        tx.enlist(tablePartitionId1, new IgniteBiTuple<>(node, 1L));
+
+        tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L));
         tx.assignCommitPartition(tablePartitionId1);
 
         return tx;

Reply via email to