This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 7a26a6714d IGNITE-21213 Coordination of mechanisms of determination for primar (#3097) 7a26a6714d is described below commit 7a26a6714d668968fe0b69ed3664672d035e01e7 Author: Cyrill <cyrill.si...@gmail.com> AuthorDate: Tue Feb 6 16:28:54 2024 +0300 IGNITE-21213 Coordination of mechanisms of determination for primar (#3097) --- .../apache/ignite/client/fakes/FakeTxManager.java | 5 +- .../ItPrimaryReplicaChoiceTest.java | 6 ++ .../ignite/internal/replicator/ReplicaManager.java | 56 +--------- .../replicator/listener/ReplicaListener.java | 18 ---- .../ignite/internal/table/ItColocationTest.java | 3 +- .../internal/table/ItTransactionRecoveryTest.java | 3 +- .../replicator/PartitionReplicaListener.java | 120 +++------------------ .../replication/PartitionReplicaListenerTest.java | 11 +- .../org/apache/ignite/internal/tx/TxManager.java | 9 +- .../internal/tx/impl/ReadWriteTransactionImpl.java | 10 +- .../internal/tx/impl/TxCleanupRequestSender.java | 21 +++- .../ignite/internal/tx/impl/TxManagerImpl.java | 55 ++++++---- .../ignite/internal/tx/impl/TxMessageSender.java | 3 +- .../tx/message/TxFinishReplicaRequest.java | 4 +- .../apache/ignite/internal/tx/TxCleanupTest.java | 38 +++++-- .../apache/ignite/internal/tx/TxManagerTest.java | 24 ++--- 16 files changed, 135 insertions(+), 251 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java index aeafe657c6..72e1615f8e 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java @@ -19,7 +19,6 @@ package org.apache.ignite.client.fakes; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; -import java.util.Collection; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -186,7 +185,7 @@ public class FakeTxManager implements TxManager { HybridTimestampTracker timestampTracker, TablePartitionId commitPartition, boolean commit, - Map<TablePartitionId, Long> enlistedGroups, + Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups, UUID txId ) { return nullCompletedFuture(); @@ -194,7 +193,7 @@ public class FakeTxManager implements TxManager { @Override public CompletableFuture<Void> cleanup( - Collection<TablePartitionId> partitions, + Map<TablePartitionId, String> enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java index f8d8e7b54f..40e8e00455 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java @@ -246,6 +246,12 @@ public class ItPrimaryReplicaChoiceTest extends ClusterPerTestIntegrationTest { NodeUtils.transferPrimary(tbl, null, this::node); + assertTrue(ignite.txManager().lockManager().locks(rwTx.id()).hasNext()); + assertEquals(6, partitionStorage.pendingCursors() + hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors()); + + rwTx.rollback(); + + assertFalse(ignite.txManager().lockManager().locks(rwTx.id()).hasNext()); assertEquals(3, partitionStorage.pendingCursors() + hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors()); } diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index ba75082063..1f088fe3c2 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -21,8 +21,6 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED; import static org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED; -import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; -import static org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; @@ -57,8 +55,6 @@ import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.NetworkMessageHandler; import org.apache.ignite.internal.placementdriver.PlacementDriver; -import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent; -import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup; import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory; import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage; @@ -232,9 +228,6 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc new LinkedBlockingQueue<>(), NamedThreadFactory.create(nodeName, "replica", LOG) ); - - placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, this::onPrimaryReplicaElected); - placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this::onPrimaryReplicaExpired); } private void onReplicaMessageReceived(NetworkMessage message, String senderConsistentId, @Nullable Long correlationId) { @@ -532,6 +525,10 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc .thenCompose(v -> replicaFuture); } + private static boolean isCompletedSuccessfully(CompletableFuture<?> future) { + return future.isDone() && !future.isCompletedExceptionally(); + } + /** * Stops a replica by the partition group id. * @@ -750,51 +747,6 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc } } - - /** - * Event handler for {@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED}. Propagates execution to the - * {@link ReplicaListener#onPrimaryElected(PrimaryReplicaEventParameters, Throwable)} of the replica, that corresponds - * to a given {@link PrimaryReplicaEventParameters#groupId()}. - */ - private CompletableFuture<Boolean> onPrimaryReplicaElected( - PrimaryReplicaEventParameters primaryReplicaEventParameters, - Throwable throwable - ) { - CompletableFuture<Replica> replica = replicas.get(primaryReplicaEventParameters.groupId()); - - if (replica == null) { - return falseCompletedFuture(); - } - - if (replica.isDone() && !replica.isCompletedExceptionally()) { - return replica.join().replicaListener().onPrimaryElected(primaryReplicaEventParameters, throwable); - } else { - return replica.thenCompose(r -> r.replicaListener().onPrimaryElected(primaryReplicaEventParameters, throwable)); - } - } - - /** - * Event handler for {@link PrimaryReplicaEvent#PRIMARY_REPLICA_EXPIRED}. Propagates execution to the - * {@link ReplicaListener#onPrimaryExpired(PrimaryReplicaEventParameters, Throwable)} of the replica, that corresponds - * to a given {@link PrimaryReplicaEventParameters#groupId()}. - */ - private CompletableFuture<Boolean> onPrimaryReplicaExpired( - PrimaryReplicaEventParameters primaryReplicaEventParameters, - Throwable throwable - ) { - CompletableFuture<Replica> replica = replicas.get(primaryReplicaEventParameters.groupId()); - - if (replica == null) { - return falseCompletedFuture(); - } - - if (replica.isDone() && !replica.isCompletedExceptionally()) { - return replica.join().replicaListener().onPrimaryExpired(primaryReplicaEventParameters, throwable); - } else { - return replica.thenCompose(r -> r.replicaListener().onPrimaryExpired(primaryReplicaEventParameters, throwable)); - } - } - /** * Idle safe time sync for replicas. */ diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java index eeee59b449..88a1937e97 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java @@ -18,12 +18,8 @@ package org.apache.ignite.internal.replicator.listener; import java.util.concurrent.CompletableFuture; -import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; -import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicaResult; import org.apache.ignite.internal.replicator.message.ReplicaRequest; -import org.apache.ignite.internal.util.CompletableFutures; -import org.jetbrains.annotations.Nullable; /** Replica listener. */ @FunctionalInterface @@ -37,20 +33,6 @@ public interface ReplicaListener { */ CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String senderId); - /** - * Invoked by {@link ReplicaManager} when current replica is elected as primary. - */ - default CompletableFuture<Boolean> onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable exception) { - return CompletableFutures.falseCompletedFuture(); - } - - /** - * Invoked by {@link ReplicaManager} then current replica stops being a primary replica. - */ - default CompletableFuture<Boolean> onPrimaryExpired(PrimaryReplicaEventParameters evt, @Nullable Throwable exception) { - return CompletableFutures.falseCompletedFuture(); - } - /** Callback on replica shutdown. */ default void onShutdown() { // No-op. diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index ee58ccc2fa..d65d494523 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -60,6 +60,7 @@ import java.util.stream.Stream; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.placementdriver.TestPlacementDriver; @@ -176,7 +177,7 @@ public class ItColocationTest extends BaseIgniteAbstractTest { HybridTimestampTracker observableTimestampTracker, TablePartitionId commitPartition, boolean commitIntent, - Map<TablePartitionId, Long> enlistedGroups, + Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups, UUID txId ) { return nullCompletedFuture(); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java index bf12690805..7236d54ee5 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java @@ -42,6 +42,7 @@ import java.util.stream.IntStream; import org.apache.ignite.InitParametersBuilder; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.DefaultMessagingService; import org.apache.ignite.internal.network.NetworkMessage; @@ -614,7 +615,7 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { new HybridTimestampTracker(), ((InternalTransaction) rwTx1).commitPartition(), false, - Map.of(((InternalTransaction) rwTx1).commitPartition(), 0L), + Map.of(((InternalTransaction) rwTx1).commitPartition(), new IgniteBiTuple<>(txCrdNode2.node(), 0L)), rwTx1Id ); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 6b83d12eae..ed0d733ee8 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -47,7 +47,6 @@ import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR; -import static org.apache.ignite.raft.jraft.util.internal.ThrowUtil.hasCause; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -84,12 +83,10 @@ import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.IgniteTriFunction; import org.apache.ignite.internal.lang.IgniteUuid; -import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.lang.SafeTimeReorderException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.placementdriver.PlacementDriver; -import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.service.RaftCommandRunner; import org.apache.ignite.internal.replicator.ReplicaResult; @@ -360,79 +357,6 @@ public class PartitionReplicaListener implements ReplicaListener { prepareIndexBuilderTxRwOperationTracker(); } - private CompletableFuture<?> durableCleanup(UUID txId, TxMeta txMeta) { - Collection<TablePartitionId> enlistedPartitions = txMeta.enlistedPartitions(); - - boolean commit = txMeta.txState() == COMMITTED; - - HybridTimestamp commitTimestamp = txMeta.commitTimestamp(); - - return txManager.cleanup(enlistedPartitions, commit, commitTimestamp, txId) - .handle((v, e) -> { - if (e != null) { - LOG.warn("Failed to execute cleanup on commit partition primary replica switch [txId={}, commitPartition={}]", - e, txId, replicationGroupId); - - if (hasCause(e, null, NodeStoppingException.class)) { - return nullCompletedFuture(); - } else { - return txManager.executeCleanupAsync(() -> durableCleanup(txId, txMeta)); - } - } else { - return nullCompletedFuture(); - } - }) - .thenCompose(f -> f); - } - - @Override - public CompletableFuture<Boolean> onPrimaryExpired(PrimaryReplicaEventParameters evt, @Nullable Throwable exception) { - assert replicationGroupId.equals(evt.groupId()) : format( - "The replication group listener does not match the event [grp={}, eventGrp={}]", - replicationGroupId, - evt.groupId() - ); - - if (!localNode.id().equals(evt.leaseholderId())) { - return falseCompletedFuture(); - } - - LOG.info("Primary replica expired [grp={}]", replicationGroupId); - - ArrayList<CompletableFuture<?>> futs = new ArrayList<>(); - - for (UUID txId : txCleanupReadyFutures.keySet()) { - txCleanupReadyFutures.compute(txId, (id, txOps) -> { - if (txOps == null) { - return null; - } - - if (!txOps.futures.isEmpty()) { - CompletableFuture<?>[] txFuts = txOps.futures.values().stream() - .flatMap(Collection::stream) - .toArray(CompletableFuture[]::new); - - futs.add(allOf(txFuts).whenComplete((unused, throwable) -> { - releaseTxLocks(txId); - - try { - closeAllTransactionCursors(txId); - } catch (Exception e) { - LOG.warn("Unable to clear resource for transaction on primary replica expiration [tx={}, replicationGrp={}]", e, - txId, replicationGroupId); - } - })); - - txOps.futures.clear(); - } - - return txOps; - }); - } - - return allOf(futs.toArray(CompletableFuture[]::new)).thenApply(unused -> false); - } - @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String senderId) { return ensureReplicaIsPrimary(request) @@ -490,10 +414,7 @@ public class PartitionReplicaListener implements ReplicaListener { // Check whether a transaction has already been finished. if (txMeta != null && isFinalState(txMeta.txState())) { - return recoverFinishedTx(txId, txMeta) - // If the sender has sent a recovery message, it failed to handle it on its own, - // so sending cleanup to the sender for the transaction we know is finished. - .whenComplete((v, ex) -> runCleanupOnNode(txId, senderId)); + return runCleanupOnNode(txId, senderId); } LOG.info("Orphan transaction has to be aborted [tx={}, meta={}].", txId, txMeta); @@ -501,16 +422,6 @@ public class PartitionReplicaListener implements ReplicaListener { return triggerTxRecovery(txId, senderId); } - private CompletableFuture<Void> recoverFinishedTx(UUID txId, TxMeta txMeta) { - if (txMeta.enlistedPartitions().isEmpty()) { - // Nothing to do if there are no enlistedPartitions available. - return nullCompletedFuture(); - } - - // Otherwise run a cleanup on the known set of partitions. - return (CompletableFuture<Void>) durableCleanup(txId, txMeta); - } - /** * Run cleanup on a node. * @@ -538,7 +449,8 @@ public class PartitionReplicaListener implements ReplicaListener { new HybridTimestampTracker(), replicationGroupId, false, - Map.of(replicationGroupId, 0L), // term is not required for the rollback. + // term is not required for the rollback. + Map.of(replicationGroupId, new IgniteBiTuple<>(clusterNodeResolver.getById(senderId), 0L)), txId ) .whenComplete((v, ex) -> runCleanupOnNode(txId, senderId)); @@ -1552,14 +1464,14 @@ public class PartitionReplicaListener implements ReplicaListener { // TODO: need to properly handle primary replica changes https://issues.apache.org/jira/browse/IGNITE-17615 private CompletableFuture<TransactionResult> processTxFinishAction(TxFinishReplicaRequest request) { // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use ZonePartitionIdMessage and remove cast - Collection<TablePartitionId> enlistedGroups = (Collection<TablePartitionId>) (Collection<?>) request.groups(); + Map<TablePartitionId, String> enlistedGroups = (Map<TablePartitionId, String>) (Map<?, ?>) request.groups(); UUID txId = request.txId(); if (request.commit()) { HybridTimestamp commitTimestamp = request.commitTimestamp(); - return schemaCompatValidator.validateCommit(txId, enlistedGroups, commitTimestamp) + return schemaCompatValidator.validateCommit(txId, enlistedGroups.keySet(), commitTimestamp) .thenCompose(validationResult -> finishAndCleanup( enlistedGroups, @@ -1597,7 +1509,7 @@ public class PartitionReplicaListener implements ReplicaListener { } private CompletableFuture<TransactionResult> finishAndCleanup( - Collection<TablePartitionId> enlistedPartitions, + Map<TablePartitionId, String> enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId @@ -1648,7 +1560,7 @@ public class PartitionReplicaListener implements ReplicaListener { return completedFuture(new TransactionResult(txMeta.txState(), txMeta.commitTimestamp())); } - return finishTransaction(enlistedPartitions, txId, commit, commitTimestamp) + return finishTransaction(enlistedPartitions.keySet(), txId, commit, commitTimestamp) .thenCompose(txResult -> txManager.cleanup(enlistedPartitions, commit, commitTimestamp, txId) .thenApply(v -> txResult) @@ -1748,20 +1660,18 @@ public class PartitionReplicaListener implements ReplicaListener { * @param request Transaction cleanup request. * @return CompletableFuture of void. */ - // TODO: need to properly handle primary replica changes https://issues.apache.org/jira/browse/IGNITE-17615 private CompletableFuture<Void> processWriteIntentSwitchAction(WriteIntentSwitchReplicaRequest request) { - try { - closeAllTransactionCursors(request.txId()); - } catch (Exception e) { - return failedFuture(e); - } - - TxState txState = request.commit() ? COMMITTED : ABORTED; - - markFinished(request.txId(), txState, request.commitTimestamp()); + markFinished(request.txId(), request.commit() ? COMMITTED : ABORTED, request.commitTimestamp()); return awaitCleanupReadyFutures(request.txId(), request.commit()) .thenCompose(res -> { + try { + closeAllTransactionCursors(request.txId()); + } catch (Exception e) { + // TODO: IGNITE-21293 Should we stop write intent switch handling if closing cursors failed? + return failedFuture(e); + } + if (res.hadUpdateFutures()) { HybridTimestamp commandTimestamp = hybridClock.now(); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 01617fdb25..cc60c1e039 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -1589,7 +1589,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { TxFinishReplicaRequest commitRequest = TX_MESSAGES_FACTORY.txFinishReplicaRequest() .groupId(grpId) .txId(txId) - .groups(Set.of(grpId)) + .groups(Map.of(grpId, localNode.name())) .commit(false) .enlistmentConsistencyToken(1L) .build(); @@ -1653,7 +1653,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { TxFinishReplicaRequest commitRequest = TX_MESSAGES_FACTORY.txFinishReplicaRequest() .groupId(grpId) .txId(txId) - .groups(Set.of(grpId)) + .groups(Map.of(grpId, localNode.name())) .commit(true) .commitTimestampLong(hybridTimestampToLong(commitTimestamp)) .enlistmentConsistencyToken(1L) @@ -2350,19 +2350,20 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { @Test void commitRequestFailsIfCommitPartitionTableWasDropped() { - testCommitRequestIfTableWasDropped(grpId, Set.of(grpId), grpId.tableId()); + testCommitRequestIfTableWasDropped(grpId, Map.of(grpId, localNode.name()), grpId.tableId()); } @Test void commitRequestFailsIfNonCommitPartitionTableWasDropped() { TablePartitionId anotherPartitionId = new TablePartitionId(ANOTHER_TABLE_ID, 0); - testCommitRequestIfTableWasDropped(grpId, Set.of(grpId, anotherPartitionId), anotherPartitionId.tableId()); + testCommitRequestIfTableWasDropped(grpId, Map.of(grpId, localNode.name(), anotherPartitionId, localNode.name()), + anotherPartitionId.tableId()); } private void testCommitRequestIfTableWasDropped( TablePartitionId commitPartitionId, - Set<ReplicationGroupId> groups, + Map<ReplicationGroupId, String> groups, int tableToBeDroppedId ) { when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), any(), any(HybridTimestamp.class))) diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java index 1e68605b50..39c10641c7 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java @@ -17,17 +17,18 @@ package org.apache.ignite.internal.tx; -import java.util.Collection; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.function.Supplier; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.lang.ErrorGroups.Transactions; +import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -143,21 +144,21 @@ public interface TxManager extends IgniteComponent { HybridTimestampTracker timestampTracker, TablePartitionId commitPartition, boolean commit, - Map<TablePartitionId, Long> enlistedGroups, + Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups, UUID txId ); /** * Sends cleanup request to the cluster nodes that hosts primary replicas for the enlisted partitions. * - * @param partitions Enlisted partition groups. + * @param enlistedPartitions Enlisted partition groups. * @param commit {@code true} if a commit requested. * @param commitTimestamp Commit timestamp ({@code null} if it's an abort). * @param txId Transaction id. * @return Completable future of Void. */ CompletableFuture<Void> cleanup( - Collection<TablePartitionId> partitions, + Map<TablePartitionId, String> enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java index 6a366dd332..91f3b9bae0 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java @@ -23,13 +23,11 @@ import static org.apache.ignite.internal.tx.TxState.isFinalState; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR; import java.util.Map; -import java.util.Map.Entry; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.replicator.TablePartitionId; @@ -172,13 +170,7 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl { * @return The future of transaction completion. */ private CompletableFuture<Void> finishInternal(boolean commit) { - Map<TablePartitionId, Long> enlistedGroups = enlisted.entrySet().stream() - .collect(Collectors.toMap( - Entry::getKey, - entry -> entry.getValue().get2() - )); - - return txManager.finish(observableTsTracker, commitPart, commit, enlistedGroups, id()); + return txManager.finish(observableTsTracker, commitPart, commit, enlisted, id()); } /** {@inheritDoc} */ diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java index b8f2468cff..9849f647d8 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java @@ -21,6 +21,8 @@ import static java.util.concurrent.CompletableFuture.allOf; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -77,19 +79,32 @@ public class TxCleanupRequestSender { /** * Sends cleanup request to the primary nodes of each one of {@code partitions}. * - * @param partitions Enlisted partition groups. + * @param enlistedPartitions Map of enlisted partition group to the initial primary node. * @param commit {@code true} if a commit requested. * @param commitTimestamp Commit timestamp ({@code null} if it's an abort). * @param txId Transaction id. * @return Completable future of Void. */ public CompletableFuture<Void> cleanup( - Collection<TablePartitionId> partitions, + Map<TablePartitionId, String> enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId ) { - return placementDriverHelper.findPrimaryReplicas(partitions) + Map<String, Set<TablePartitionId>> partitions = new HashMap<>(); + enlistedPartitions.forEach((partitionId, nodeId) -> + partitions.computeIfAbsent(nodeId, node -> new HashSet<>()).add(partitionId)); + + return cleanupPartitions(partitions, commit, commitTimestamp, txId); + } + + private CompletableFuture<Void> cleanup( + Collection<TablePartitionId> partitionIds, + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + UUID txId + ) { + return placementDriverHelper.findPrimaryReplicas(partitionIds) .thenCompose(partitionData -> { switchWriteIntentsOnPartitions(commit, commitTimestamp, txId, partitionData.partitionsWithoutPrimary); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index cadd9a1164..757a7eaac5 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -36,11 +36,10 @@ import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_PRIMARY_REPLICA import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR; import java.io.IOException; -import java.util.Collection; import java.util.Comparator; -import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -57,9 +56,11 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.ignite.internal.event.EventListener; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.logger.IgniteLogger; @@ -98,6 +99,7 @@ import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.TopologyService; import org.jetbrains.annotations.Nullable; @@ -254,10 +256,10 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { TxContext txContext = ctxEntry.getValue(); if (txContext.isTxFinishing()) { - Long enlistmentConsistencyToken = txContext.enlistedGroups.get(groupId); + IgniteBiTuple<ClusterNode, Long> nodeAndToken = txContext.enlistedGroups.get(groupId); - if (enlistmentConsistencyToken != null) { - txContext.cancelWaitingInflights(groupId, enlistmentConsistencyToken); + if (nodeAndToken != null) { + txContext.cancelWaitingInflights(groupId, nodeAndToken.get2()); } } } @@ -370,7 +372,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { HybridTimestampTracker observableTimestampTracker, TablePartitionId commitPartition, boolean commitIntent, - Map<TablePartitionId, Long> enlistedGroups, + Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups, UUID txId ) { LOG.debug("Finish [commit={}, txId={}, groups={}].", commitIntent, txId, enlistedGroups); @@ -417,7 +419,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { } } - TxContext txContext = lockTxForNewUpdates(txId, enlistedGroups, commitIntent); + TxContext txContext = lockTxForNewUpdates(txId, enlistedGroups); // Wait for commit acks first, then proceed with the finish request. return txContext.performFinish(commitIntent, commit -> @@ -436,7 +438,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { }); } - private TxContext lockTxForNewUpdates(UUID txId, Map<TablePartitionId, Long> enlistedGroups, boolean commitIntent) { + private TxContext lockTxForNewUpdates(UUID txId, Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups) { return txCtxMap.compute(txId, (uuid, tuple0) -> { if (tuple0 == null) { tuple0 = new TxContext(placementDriver); // No writes enlisted. @@ -465,7 +467,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { HybridTimestampTracker observableTimestampTracker, TablePartitionId commitPartition, boolean commit, - Map<TablePartitionId, Long> enlistedGroups, + Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups, UUID txId, CompletableFuture<TransactionMeta> txFinishFuture ) { @@ -480,7 +482,11 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { (unused, throwable) -> { boolean verifiedCommit = throwable == null && commit; - Collection<ReplicationGroupId> replicationGroupIds = new HashSet<>(enlistedGroups.keySet()); + Map<ReplicationGroupId, String> replicationGroupIds = enlistedGroups.entrySet().stream() + .collect(Collectors.toMap( + Entry::getKey, + entry -> entry.getValue().get1().name() + )); return durableFinish( observableTimestampTracker, @@ -503,7 +509,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { HybridTimestampTracker observableTimestampTracker, TablePartitionId commitPartition, boolean commit, - Collection<ReplicationGroupId> replicationGroupIds, + Map<ReplicationGroupId, String> replicationGroupIds, UUID txId, HybridTimestamp commitTimestamp, CompletableFuture<TransactionMeta> txFinishFuture @@ -574,7 +580,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { String primaryConsistentId, Long enlistmentConsistencyToken, boolean commit, - Collection<ReplicationGroupId> replicationGroupIds, + Map<ReplicationGroupId, String> replicationGroupIds, UUID txId, HybridTimestamp commitTimestamp, CompletableFuture<TransactionMeta> txFinishFuture @@ -688,12 +694,12 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { @Override public CompletableFuture<Void> cleanup( - Collection<TablePartitionId> partitions, + Map<TablePartitionId, String> enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId ) { - return txCleanupRequestSender.cleanup(partitions, commit, commitTimestamp, txId); + return txCleanupRequestSender.cleanup(enlistedPartitions, commit, commitTimestamp, txId); } @Override @@ -808,13 +814,16 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { * @param commitTimestamp Commit timestamp. * @return Verification future. */ - private CompletableFuture<Void> verifyCommitTimestamp(Map<TablePartitionId, Long> enlistedGroups, HybridTimestamp commitTimestamp) { + private CompletableFuture<Void> verifyCommitTimestamp( + Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups, + HybridTimestamp commitTimestamp + ) { var verificationFutures = new CompletableFuture[enlistedGroups.size()]; int cnt = -1; - for (Map.Entry<TablePartitionId, Long> enlistedGroup : enlistedGroups.entrySet()) { + for (Map.Entry<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroup : enlistedGroups.entrySet()) { TablePartitionId groupId = enlistedGroup.getKey(); - Long expectedEnlistmentConsistencyToken = enlistedGroup.getValue(); + Long expectedEnlistmentConsistencyToken = enlistedGroup.getValue().get2(); verificationFutures[++cnt] = placementDriver.getPrimaryReplica(groupId, commitTimestamp) .thenAccept(currentPrimaryReplica -> { @@ -845,7 +854,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { private final CompletableFuture<Void> waitRepFut = new CompletableFuture<>(); private final PlacementDriver placementDriver; volatile CompletableFuture<Void> finishInProgressFuture = null; - volatile Map<TablePartitionId, Long> enlistedGroups; + volatile Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups; private TxContext(PlacementDriver placementDriver) { this.placementDriver = placementDriver; @@ -888,11 +897,13 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { private CompletableFuture<Void> waitReadyToFinish(boolean commit) { if (commit) { - for (Map.Entry<TablePartitionId, Long> e : enlistedGroups.entrySet()) { + for (Map.Entry<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> e : enlistedGroups.entrySet()) { ReplicaMeta replicaMeta = placementDriver.currentLease(e.getKey()); - if (replicaMeta == null || !e.getValue().equals(replicaMeta.getStartTime().longValue())) { - return failedFuture(new PrimaryReplicaExpiredException(e.getKey(), e.getValue(), null, replicaMeta)); + Long enlistmentConsistencyToken = e.getValue().get2(); + + if (replicaMeta == null || !enlistmentConsistencyToken.equals(replicaMeta.getStartTime().longValue())) { + return failedFuture(new PrimaryReplicaExpiredException(e.getKey(), enlistmentConsistencyToken, null, replicaMeta)); } } @@ -919,7 +930,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { } } - void finishTx(Map<TablePartitionId, Long> enlistedGroups) { + void finishTx(Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups) { this.enlistedGroups = enlistedGroups; finishInProgressFuture = new CompletableFuture<>(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java index 08c494b014..16b750a857 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.tx.impl; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; import java.util.Collection; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.hlc.HybridClock; @@ -140,7 +141,7 @@ public class TxMessageSender { public CompletableFuture<TransactionResult> finish( String primaryConsistentId, TablePartitionId commitPartition, - Collection<ReplicationGroupId> replicationGroupIds, + Map<ReplicationGroupId, String> replicationGroupIds, UUID txId, Long consistencyToken, boolean commit, diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java index 59dc9fef57..a663f1e8a5 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.tx.message; import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; -import java.util.Collection; +import java.util.Map; import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.annotations.Marshallable; @@ -72,5 +72,5 @@ public interface TxFinishReplicaRequest extends PrimaryReplicaRequest, Timestamp * @return Enlisted partition groups aggregated by expected primary replica nodes. */ @Marshallable - Collection<ReplicationGroupId> groups(); + Map<ReplicationGroupId, String> groups(); } diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java index 29d9d465ea..7580c7edc1 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.tx; import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; @@ -35,7 +36,8 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import java.util.Set; +import java.io.IOException; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.hlc.HybridClock; @@ -115,15 +117,15 @@ public class TxCleanupTest extends IgniteAbstractTest { } @Test - void testPrimaryFoundForAllPartitions() { + void testCleanupAllNodes() { TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0); TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0); - Set<TablePartitionId> partitions = Set.of(tablePartitionId1, tablePartitionId2, tablePartitionId3); - - when(placementDriver.getPrimaryReplica(any(), any())) - .thenReturn(completedFuture(new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE))); + Map<TablePartitionId, String> partitions = Map.of( + tablePartitionId1, LOCAL_NODE.name(), + tablePartitionId2, LOCAL_NODE.name(), + tablePartitionId3, LOCAL_NODE.name()); HybridTimestamp beginTimestamp = clock.now(); UUID txId = idGenerator.transactionIdFor(beginTimestamp); @@ -140,12 +142,19 @@ public class TxCleanupTest extends IgniteAbstractTest { } @Test - void testPrimaryNotFoundForSome() { + void testPrimaryNotFoundForSomeAfterException() { TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0); TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0); - Set<TablePartitionId> partitions = Set.of(tablePartitionId1, tablePartitionId2, tablePartitionId3); + Map<TablePartitionId, String> partitions = Map.of( + tablePartitionId1, LOCAL_NODE.name(), + tablePartitionId2, LOCAL_NODE.name(), + tablePartitionId3, LOCAL_NODE.name()); + + // First cleanup fails: + when(messagingService.invoke(anyString(), any(), anyLong())) + .thenReturn(failedFuture(new IOException("Test failure")), nullCompletedFuture()); when(placementDriver.getPrimaryReplica(any(), any())) .thenReturn(completedFuture(new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE))); @@ -165,7 +174,7 @@ public class TxCleanupTest extends IgniteAbstractTest { assertThat(cleanup, willCompleteSuccessfully()); verify(txMessageSender, times(1)).switchWriteIntents(any(), any(), any(), anyBoolean(), any()); - verify(txMessageSender, times(1)).cleanup(any(), any(), any(), anyBoolean(), any()); + verify(txMessageSender, times(2)).cleanup(any(), any(), any(), anyBoolean(), any()); verifyNoMoreInteractions(txMessageSender); } @@ -175,7 +184,14 @@ public class TxCleanupTest extends IgniteAbstractTest { TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0); TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0); - Set<TablePartitionId> partitions = Set.of(tablePartitionId1, tablePartitionId2, tablePartitionId3); + Map<TablePartitionId, String> partitions = Map.of( + tablePartitionId1, LOCAL_NODE.name(), + tablePartitionId2, LOCAL_NODE.name(), + tablePartitionId3, LOCAL_NODE.name()); + + // First cleanup fails: + when(messagingService.invoke(anyString(), any(), anyLong())) + .thenReturn(failedFuture(new IOException("Test failure")), nullCompletedFuture()); when(placementDriver.getPrimaryReplica(any(), any())) .thenReturn(nullCompletedFuture()); @@ -193,6 +209,8 @@ public class TxCleanupTest extends IgniteAbstractTest { assertThat(cleanup, willCompleteSuccessfully()); verify(txMessageSender, times(3)).switchWriteIntents(any(), any(), any(), anyBoolean(), any()); + verify(txMessageSender, times(1)).cleanup(any(), any(), any(), anyBoolean(), any()); + verifyNoMoreInteractions(txMessageSender); } } diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java index 8ee29bfb99..22202edcfd 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java @@ -106,6 +106,9 @@ import org.mockito.verification.VerificationMode; public class TxManagerTest extends IgniteAbstractTest { private static final ClusterNode LOCAL_NODE = new ClusterNodeImpl("local_id", "local", new NetworkAddress("127.0.0.1", 2004), null); + private static final ClusterNode REMOTE_NODE = + new ClusterNodeImpl("remote_id", "remote", new NetworkAddress("127.1.1.1", 2024), null); + private HybridTimestampTracker hybridTimestampTracker = new HybridTimestampTracker(); private final LongSupplier idleSafeTimePropagationPeriodMsSupplier = () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS; @@ -267,11 +270,9 @@ public class TxManagerTest extends IgniteAbstractTest { InternalTransaction tx = txManager.begin(hybridTimestampTracker); - ClusterNode node = mock(ClusterNode.class); - TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); - tx.enlist(tablePartitionId1, new IgniteBiTuple<>(node, 1L)); + tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L)); tx.assignCommitPartition(tablePartitionId1); tx.commit(); @@ -290,11 +291,9 @@ public class TxManagerTest extends IgniteAbstractTest { InternalTransaction tx = txManager.begin(hybridTimestampTracker); - ClusterNode node = mock(ClusterNode.class); - TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); - tx.enlist(tablePartitionId1, new IgniteBiTuple<>(node, 1L)); + tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L)); tx.assignCommitPartition(tablePartitionId1); tx.rollback(); @@ -318,11 +317,9 @@ public class TxManagerTest extends IgniteAbstractTest { InternalTransaction tx = txManager.begin(hybridTimestampTracker); - ClusterNode node = mock(ClusterNode.class); - TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); - tx.enlist(tablePartitionId1, new IgniteBiTuple<>(node, 1L)); + tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L)); tx.assignCommitPartition(tablePartitionId1); TransactionException transactionException = assertThrows(TransactionException.class, tx::commit); @@ -348,11 +345,9 @@ public class TxManagerTest extends IgniteAbstractTest { InternalTransaction tx = txManager.begin(hybridTimestampTracker); - ClusterNode node = mock(ClusterNode.class); - TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); - tx.enlist(tablePartitionId1, new IgniteBiTuple<>(node, 1L)); + tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L)); tx.assignCommitPartition(tablePartitionId1); TransactionException transactionException = assertThrows(TransactionException.class, tx::rollback); @@ -737,10 +732,9 @@ public class TxManagerTest extends IgniteAbstractTest { private InternalTransaction prepareTransaction() { InternalTransaction tx = txManager.begin(hybridTimestampTracker); - ClusterNode node = mock(ClusterNode.class); - TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); - tx.enlist(tablePartitionId1, new IgniteBiTuple<>(node, 1L)); + + tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L)); tx.assignCommitPartition(tablePartitionId1); return tx;