This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 4c7cf15e52 IGNITE-21071 Rollback the transaction on primary failure if replication is not finished (#3030) 4c7cf15e52 is described below commit 4c7cf15e520eee0097712e78591150981307dac6 Author: Denis Chudov <moongll...@gmail.com> AuthorDate: Thu Jan 18 19:50:49 2024 +0300 IGNITE-21071 Rollback the transaction on primary failure if replication is not finished (#3030) --- .../ignite/client/handler/FakePlacementDriver.java | 11 + .../internal/compute/ItWorkerShutdownTest.java | 14 +- .../internal/index/IndexBuildControllerTest.java | 5 + .../internal/placementdriver/PlacementDriver.java | 10 + .../placementdriver/TestPlacementDriver.java | 5 + .../placementdriver/leases/LeaseTracker.java | 64 +++-- .../ignite/internal/raft/RaftGroupServiceImpl.java | 9 +- .../ignite/internal/table/ItDurableFinishTest.java | 19 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 7 + .../java/org/apache/ignite/internal/Cluster.java | 25 ++ .../internal/ClusterPerTestIntegrationTest.java | 17 ++ .../ignite/internal/table/ItColocationTest.java | 2 +- .../internal/table/ItTransactionRecoveryTest.java | 300 ++++++++++++++------- .../ignite/internal/table/TxAbstractTest.java | 16 +- .../tx/TransactionAlreadyFinishedException.java | 8 +- .../tx/impl/PrimaryReplicaExpiredException.java | 2 +- .../ignite/internal/tx/impl/TxManagerImpl.java | 158 ++++++++--- .../tx/AbstractDeadlockPreventionTest.java | 17 ++ .../apache/ignite/internal/tx/TxManagerTest.java | 15 +- 19 files changed, 532 insertions(+), 172 deletions(-) diff --git a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java index 5853de37f1..e110c38fe7 100644 --- a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java +++ b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java @@ -103,6 +103,17 @@ public class FakePlacementDriver extends AbstractEventProducer<PrimaryReplicaEve return nullCompletedFuture(); } + @Override + public ReplicaMeta currentLease(ReplicationGroupId groupId) { + TablePartitionId id = (TablePartitionId) groupId; + + if (returnError) { + throw new RuntimeException("FakePlacementDriver expected error"); + } else { + return primaryReplicas.get(id.partitionId()); + } + } + private static ReplicaMeta getReplicaMeta(String leaseholder, long leaseStartTime) { //noinspection serial return new ReplicaMeta() { diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java index 2dcac4163c..307ae2c627 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java @@ -223,7 +223,7 @@ class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest { UUID jobIdBeforeFail = idSync(execution); // When stop worker node. - stopNode(workerNodeName); + stopNodeByName(workerNodeName); // And remove it from candidates. remoteWorkerCandidates.remove(workerNodeName); @@ -273,7 +273,7 @@ class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest { checkGlobalInteractiveJobAlive(execution); // When stop worker node. - stopNode(workerNodeName); + stopNodeByName(workerNodeName); // Then the job is failed, because there is no any failover worker. assertThat(execution.resultAsync(), willThrow(IgniteException.class)); @@ -295,7 +295,7 @@ class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest { assertThat(getWorkerNodeNameFromGlobalInteractiveJob(), equalTo(entryNode.name())); // When stop entry node. - stopNode(entryNode.name()); + stopNodeByName(entryNode.name()); // Then the job is failed, because there is no any failover worker. assertThat(execution.resultAsync().isCompletedExceptionally(), equalTo(true)); @@ -321,7 +321,7 @@ class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest { executions.forEach(ItWorkerShutdownTest::checkInteractiveJobAlive); // When stop one of workers. - stopNode(node(1).name()); + stopNodeByName(node(1).name()); // Then two jobs are alive. executions.forEach((node, execution) -> { @@ -355,7 +355,7 @@ class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest { checkGlobalInteractiveJobAlive(execution); // When stop worker node. - stopNode(workerNodeName); + stopNodeByName(workerNodeName); // And remove it from candidates. remoteWorkerCandidates.remove(workerNodeName); @@ -417,10 +417,10 @@ class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest { } private void stopNode(IgniteImpl ignite) { - stopNode(ignite.name()); + stopNodeByName(ignite.name()); } - private void stopNode(String name) { + private void stopNodeByName(String name) { int ind = NODES_NAMES_TO_INDEXES.get(name); node(ind).stop(); } diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java index 1b5d6f1169..2423e913f7 100644 --- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java +++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java @@ -299,6 +299,11 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest { throw new UnsupportedOperationException(); } + @Override + public ReplicaMeta currentLease(ReplicationGroupId groupId) { + return primaryReplicaMetaFutureById.get(groupId).join(); + } + CompletableFuture<Void> setPrimaryReplicaMeta( long causalityToken, TablePartitionId replicaId, diff --git a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java index 73d887cbb4..fbc3d564c7 100644 --- a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java +++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent; import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.jetbrains.annotations.Nullable; /** * Service that provides an ability to await and retrieve primary replicas for replication groups. @@ -77,4 +78,13 @@ public interface PlacementDriver extends EventProducer<PrimaryReplicaEvent, Prim * @return Future. */ CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId grpId); + + /** + * Returns the current knowledge about the lease on the local node. + * + * @param groupId Group id. + * @return Current lease. + */ + @Nullable + ReplicaMeta currentLease(ReplicationGroupId groupId); } diff --git a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java index 9fd5940041..9ea1b14ce0 100644 --- a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java +++ b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java @@ -87,6 +87,11 @@ public class TestPlacementDriver extends AbstractEventProducer<PrimaryReplicaEve return nullCompletedFuture(); } + @Override + public ReplicaMeta currentLease(ReplicationGroupId groupId) { + return getReplicaMetaFuture().join(); + } + @Override public CompletableFuture<Void> fireEvent(PrimaryReplicaEvent event, PrimaryReplicaEventParameters parameters) { return super.fireEvent(event, parameters); diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java index d44d491fd7..fc8e50e40c 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java @@ -135,6 +135,15 @@ public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent, Pri return expirationFutureByGroup.getOrDefault(grpId, nullCompletedFuture()); } + @Override + public ReplicaMeta currentLease(ReplicationGroupId groupId) { + return inBusyLock(busyLock, () -> { + Lease lease = getLease(groupId); + + return lease.isAccepted() ? lease : null; + }); + } + /** * Gets a lease for a particular group. * @@ -162,6 +171,7 @@ public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent, Pri public CompletableFuture<Void> onUpdate(WatchEvent event) { return inBusyLockAsync(busyLock, () -> { List<CompletableFuture<?>> fireEventFutures = new ArrayList<>(); + List<Lease> expiredLeases = new ArrayList<>(); for (EntryEvent entry : event.entryEvents()) { Entry msEntry = entry.newEntry(); @@ -188,18 +198,26 @@ public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent, Pri } } - firePrimaryReplicaExpiredEventIfNeeded(grpId, event.revision(), lease); + if (needToFireEventReplicaExpired(grpId, lease)) { + expiredLeases.add(leases.leaseByGroupId().get(grpId)); + } } for (ReplicationGroupId grpId : leases.leaseByGroupId().keySet()) { if (!leasesMap.containsKey(grpId)) { tryRemoveTracker(grpId); - firePrimaryReplicaExpiredEventIfNeeded(grpId, event.revision(), null); + if (needToFireEventReplicaExpired(grpId, null)) { + expiredLeases.add(leases.leaseByGroupId().get(grpId)); + } } } leases = new Leases(unmodifiableMap(leasesMap), leasesBytes); + + for (Lease expiredLease : expiredLeases) { + firePrimaryReplicaExpiredEvent(event.revision(), expiredLease); + } } return allOf(fireEventFutures.toArray(CompletableFuture[]::new)); @@ -308,10 +326,10 @@ public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent, Pri * Fires the primary replica expire event if it needs. * * @param grpId Group id, used for the cases when the {@code lease} parameter is null. Should be always not null. - * @param causalityToken Causality token. * @param lease Lease to check on expiration. + * @return Whether the event is needed. */ - private void firePrimaryReplicaExpiredEventIfNeeded(ReplicationGroupId grpId, long causalityToken, @Nullable Lease lease) { + private boolean needToFireEventReplicaExpired(ReplicationGroupId grpId, @Nullable Lease lease) { assert lease == null || lease.replicationGroupId().equals(grpId) : IgniteStringFormatter.format("Group id mismatch [groupId={}, lease={}]", grpId, lease); @@ -321,20 +339,34 @@ public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent, Pri boolean sameLease = lease != null && currentLease.getStartTime().equals(lease.getStartTime()); if (!sameLease) { - CompletableFuture<Void> prev = expirationFutureByGroup.put(grpId, fireEvent( - PRIMARY_REPLICA_EXPIRED, - new PrimaryReplicaEventParameters( - causalityToken, - grpId, - currentLease.getLeaseholderId(), - currentLease.getLeaseholder(), - currentLease.getStartTime() - ) - )); - - assert prev == null || prev.isDone() : "Previous lease expiration process has not completed yet [grpId=" + grpId + ']'; + return true; } } + + return false; + } + + /** + * Fires the primary replica expire event. + * + * @param causalityToken Causality token. + * @param expiredLease Expired lease. + */ + private void firePrimaryReplicaExpiredEvent(long causalityToken, Lease expiredLease) { + ReplicationGroupId grpId = expiredLease.replicationGroupId(); + + CompletableFuture<Void> prev = expirationFutureByGroup.put(grpId, fireEvent( + PRIMARY_REPLICA_EXPIRED, + new PrimaryReplicaEventParameters( + causalityToken, + grpId, + expiredLease.getLeaseholderId(), + expiredLease.getLeaseholder(), + expiredLease.getStartTime() + ) + )); + + assert prev == null || prev.isDone() : "Previous lease expiration process has not completed yet [grpId=" + grpId + ']'; } private CompletableFuture<Void> fireEventReplicaBecomePrimary(long causalityToken, Lease lease) { diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java index f0f3ec1304..1441ca84d7 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java @@ -587,15 +587,18 @@ public class RaftGroupServiceImpl implements RaftGroupService { CompletableFuture<? extends NetworkMessage> fut ) { if (recoverable(err)) { + Peer randomPeer = randomNode(peer); + LOG.warn( "Recoverable error during the request occurred (will be retried on the randomly selected node) " - + "[request={}, peer={}].", + + "[request={}, peer={}, newPeer={}].", err, sentRequest, - peer + peer, + randomPeer ); - scheduleRetry(() -> sendWithRetry(randomNode(peer), requestFactory, stopTime, fut)); + scheduleRetry(() -> sendWithRetry(randomPeer, requestFactory, stopTime, fut)); } else { fut.completeExceptionally(err); } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java index ab22a7f004..90f3e8481b 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java @@ -30,6 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; @@ -76,9 +77,7 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { throws ExecutionException, InterruptedException { createTestTableWith3Replicas(); - TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME); - - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); + var tblReplicationGrp = defaultTablePartitionId(node(0)); CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica( tblReplicationGrp, @@ -101,6 +100,8 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { Tuple keyTpl = Tuple.create().set("key", 42); Tuple tpl = Tuple.create().set("key", 42).set("val", "val 42"); + TableImpl tbl = (TableImpl) coordinatorNode.tables().table(TABLE_NAME); + tbl.recordView().upsert(rwTx, tpl); msgConf.accept(primaryNode, coordinatorNode, tbl, rwTx); @@ -108,6 +109,12 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { finisher.accept(rwTx, tbl, keyTpl); } + private TablePartitionId defaultTablePartitionId(IgniteImpl node) { + TableImpl table = (TableImpl) node.tables().table(TABLE_NAME); + + return new TablePartitionId(table.tableId(), 0); + } + private void commitRow(InternalTransaction rwTx, TableImpl tbl, Tuple keyTpl) { rwTx.commit(); @@ -139,12 +146,16 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { AtomicBoolean dropMessage = new AtomicBoolean(true); + CountDownLatch commitStartedLatch = new CountDownLatch(1); + // Make sure the finish message is prepared, i.e. the outcome, commit timestamp, primary node, etc. have been set, // and then temporarily block the messaging to simulate network issues. coordinatorMessaging.dropMessages((s, networkMessage) -> { if (networkMessage instanceof TxFinishReplicaRequest && dropMessage.get()) { logger().info("Dropping: {}.", networkMessage); + commitStartedLatch.countDown(); + return true; } @@ -156,6 +167,8 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { // will run in the current thread. CompletableFuture.runAsync(() -> { try { + commitStartedLatch.await(); + logger().info("Start transferring primary."); NodeUtils.transferPrimary(tbl, null, this::node); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 6afc21aa95..d8611e60dc 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -164,6 +164,7 @@ import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.VaultService; import org.apache.ignite.internal.vault.persistence.PersistentVaultService; import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.network.ChannelType; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.DefaultMessagingService; @@ -1188,6 +1189,12 @@ public class IgniteImpl implements Ignite { return ((DefaultMessagingService) clusterSvc.messagingService()).dropMessagesPredicate(); } + // TODO IGNITE-18493 - remove/move this + @TestOnly + public void sendFakeMessage(String recipientConsistentId, NetworkMessage msg) { + clusterSvc.messagingService().send(recipientConsistentId, ChannelType.DEFAULT, msg); + } + // TODO: IGNITE-18493 - remove/move this @TestOnly public void stopDroppingMessages() { diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java index f69509742c..c54a802490 100644 --- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java +++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java @@ -378,6 +378,31 @@ public class Cluster { nodes.set(index, null); } + /** + * Stops a node by name. + * + * @param name Name of the node in the cluster. + */ + public void stopNode(String name) { + stopNode(nodeIndex(name)); + } + + /** + * Returns index of the node. + * + * @param name Node name. + * @return Node index. + */ + public int nodeIndex(String name) { + for (int i = 0; i < nodes.size(); i++) { + if (nodes.get(i) != null && nodes.get(i).name().equals(name)) { + return i; + } + } + + throw new IllegalArgumentException("Node is not found: " + name); + } + /** * Restarts a node by index. * diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java index cc1d6fc133..ed657d7e85 100644 --- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java +++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal; import java.nio.file.Path; import java.util.List; +import java.util.stream.Stream; import org.apache.ignite.Ignite; import org.apache.ignite.InitParametersBuilder; import org.apache.ignite.internal.app.IgniteImpl; @@ -171,6 +172,22 @@ public abstract class ClusterPerTestIntegrationTest extends IgniteIntegrationTes cluster.stopNode(nodeIndex); } + /** + * Stops a node by name. + * + * @param name Name of the node in the cluster. + */ + protected void stopNode(String name) { + cluster.stopNode(name); + } + + /** + * Returns nodes that are started and not stopped. This can include knocked out nodes. + */ + protected final Stream<IgniteImpl> runningNodes() { + return cluster.runningNodes(); + } + /** * Restarts a node by index. * diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index c72763b031..c0b023a208 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -173,7 +173,7 @@ public class ItColocationTest extends BaseIgniteAbstractTest { public CompletableFuture<Void> finish( HybridTimestampTracker observableTimestampTracker, TablePartitionId commitPartition, - boolean commit, + boolean commitIntent, Map<TablePartitionId, Long> enlistedGroups, UUID txId ) { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java similarity index 77% rename from modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java rename to modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java index 694f27620e..d72df1b78e 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java @@ -26,6 +26,9 @@ import static org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -34,13 +37,19 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.stream.IntStream; import org.apache.ignite.InitParametersBuilder; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory; +import org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.message.ErrorTimestampAwareReplicaResponse; +import org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse; +import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; @@ -60,6 +69,7 @@ import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; import org.apache.ignite.tx.Transaction; +import org.apache.ignite.tx.TransactionException; import org.apache.ignite.tx.TransactionOptions; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; @@ -70,6 +80,8 @@ import org.junit.jupiter.api.TestInfo; * Abandoned transactions integration tests. */ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { + private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory(); + /** Table name. */ private static final String TABLE_NAME = "test_table"; @@ -104,16 +116,7 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); - CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica( - tblReplicationGrp, - node(0).clock().now(), - 10, - SECONDS - ); - - assertThat(primaryReplicaFut, willCompleteSuccessfully()); - - String leaseholder = primaryReplicaFut.join().getLeaseholder(); + String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder); @@ -168,16 +171,7 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); - CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica( - tblReplicationGrp, - node(0).clock().now(), - 10, - SECONDS - ); - - assertThat(primaryReplicaFut, willCompleteSuccessfully()); - - String leaseholder = primaryReplicaFut.join().getLeaseholder(); + String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder); @@ -220,16 +214,7 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); - CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica( - tblReplicationGrp, - node(0).clock().now(), - 10, - SECONDS - ); - - assertThat(primaryReplicaFut, willCompleteSuccessfully()); - - String leaseholder = primaryReplicaFut.join().getLeaseholder(); + String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder); @@ -275,16 +260,7 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); - CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica( - tblReplicationGrp, - node(0).clock().now(), - 10, - SECONDS - ); - - assertThat(primaryReplicaFut, willCompleteSuccessfully()); - - String leaseholder = primaryReplicaFut.join().getLeaseholder(); + String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder); @@ -329,16 +305,7 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); - CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica( - tblReplicationGrp, - node(0).clock().now(), - 10, - SECONDS - ); - - assertThat(primaryReplicaFut, willCompleteSuccessfully()); - - String leaseholder = primaryReplicaFut.join().getLeaseholder(); + String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder); @@ -402,16 +369,7 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); - CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica( - tblReplicationGrp, - node(0).clock().now(), - 10, - SECONDS - ); - - assertThat(primaryReplicaFut, willCompleteSuccessfully()); - - String leaseholder = primaryReplicaFut.join().getLeaseholder(); + String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder); @@ -480,16 +438,7 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); - CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica( - tblReplicationGrp, - node(0).clock().now(), - 10, - SECONDS - ); - - assertThat(primaryReplicaFut, willCompleteSuccessfully()); - - String leaseholder = primaryReplicaFut.join().getLeaseholder(); + String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder); @@ -564,16 +513,7 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); - CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica( - tblReplicationGrp, - node(0).clock().now(), - 10, - SECONDS - ); - - assertThat(primaryReplicaFut, willCompleteSuccessfully()); - - String leaseholder = primaryReplicaFut.join().getLeaseholder(); + String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder); @@ -650,16 +590,7 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); - CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica( - tblReplicationGrp, - node(0).clock().now(), - 10, - SECONDS - ); - - assertThat(primaryReplicaFut, willCompleteSuccessfully()); - - String leaseholder = primaryReplicaFut.join().getLeaseholder(); + String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder); @@ -690,6 +621,156 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { assertThat(finish2, willThrow(TransactionAlreadyFinishedException.class)); } + @Test + public void testPrimaryFailureRightAfterCommitMsg() throws Exception { + TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME); + + var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); + + String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); + + IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder); + + log.info("Transaction commit partition is determined [node={}].", commitPartNode.name()); + + IgniteImpl txCrdNode = nonPrimaryNode(leaseholder); + + log.info("Transaction coordinator node is determined [node={}].", txCrdNode.name()); + + Transaction rwTx1 = createRwTransaction(txCrdNode); + + CompletableFuture<?> commitMsgSentFut = new CompletableFuture<>(); + CompletableFuture<?> cancelLeaseFuture = new CompletableFuture<>(); + + txCrdNode.dropMessages((nodeName, msg) -> { + if (msg instanceof TxFinishReplicaRequest) { + boolean isFirst = !commitMsgSentFut.isDone(); + + if (isFirst) { + commitMsgSentFut.complete(null); + + return true; + } else { + cancelLeaseFuture.join(); + + return false; + } + } + + return false; + }); + + CompletableFuture<Void> commitFut = rwTx1.commitAsync(); + + assertThat(commitMsgSentFut, willCompleteSuccessfully()); + + cancelLease(commitPartNode, tblReplicationGrp); + + waitAndGetLeaseholder(txCrdNode, tblReplicationGrp); + + cancelLeaseFuture.complete(null); + + assertThat(commitFut, willCompleteSuccessfully()); + + RecordView<Tuple> view = txCrdNode.tables().table(TABLE_NAME).recordView(); + + var rec = view.get(null, Tuple.create().set("key", 42)); + + assertNotNull(rec); + assertEquals((Integer) 42, rec.value("key")); + assertEquals("val1", rec.value("val")); + } + + @Test + public void testPrimaryFailureWhileInflightInProgress() throws Exception { + TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME); + + var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); + + String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); + + IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder); + + log.info("Transaction commit partition is determined [node={}].", commitPartNode.name()); + + IgniteImpl txCrdNode = nonPrimaryNode(leaseholder); + + log.info("Transaction coordinator node is determined [node={}].", txCrdNode.name()); + + Transaction rwTx1 = createRwTransaction(txCrdNode); + + txCrdNode.dropMessages((nodeName, msg) -> { + if (msg instanceof ReadWriteSingleRowReplicaRequest) { + return true; + } + + return false; + }); + + assertThrows(TransactionException.class, () -> { + RecordView<Tuple> view = txCrdNode.tables().table(TABLE_NAME).recordView(); + view.upsert(rwTx1, Tuple.create().set("key", 1).set("val", "val1")); + }); + + CompletableFuture<Void> commitFut = rwTx1.commitAsync(); + + commitPartNode.stop(); + + assertThat(commitFut, willCompleteSuccessfully()); + } + + @Test + public void testPrimaryFailureWhileInflightInProgressAfterFirstResponse() throws Exception { + TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME); + + var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); + + String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); + + IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder); + + log.info("Transaction commit partition is determined [node={}].", commitPartNode.name()); + + IgniteImpl txCrdNode = nonPrimaryNode(leaseholder); + + log.info("Transaction coordinator node is determined [node={}].", txCrdNode.name()); + + CompletableFuture<?> firstResponseSent = new CompletableFuture<>(); + + commitPartNode.dropMessages((nodeName, msg) -> { + if (msg instanceof TimestampAwareReplicaResponse) { + TimestampAwareReplicaResponse response = (TimestampAwareReplicaResponse) msg; + + if (response.result() == null) { + firstResponseSent.complete(null); + } + + // This means this is the second response that finishes an in-flight future. + if (response.result() instanceof UUID) { + return true; + } + } + + return false; + }); + + Transaction rwTx1 = createRwTransaction(txCrdNode); + + CompletableFuture<Void> commitFut = rwTx1.commitAsync(); + + assertThat(firstResponseSent, willCompleteSuccessfully()); + + cancelLease(commitPartNode, tblReplicationGrp); + + assertThat(commitFut, willThrow(TransactionAlreadyFinishedException.class, 30, SECONDS)); + + RecordView<Tuple> view = txCrdNode.tables().table(TABLE_NAME).recordView(); + + var rec = view.get(null, Tuple.create().set("key", 42)); + + assertNull(rec); + } + private DefaultMessagingService messaging(IgniteImpl node) { ClusterService coordinatorService = IgniteTestUtils.getFieldValue(node, IgniteImpl.class, "clusterSvc"); @@ -802,19 +883,42 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { return rwTx1; } - private IgniteImpl commitPartitionPrimaryNode(String leaseholder) { - return IntStream.range(0, initialNodes()) + private IgniteImpl findNode(int startRange, int endRange, Predicate<IgniteImpl> filter) { + return IntStream.range(startRange, endRange) .mapToObj(this::node) - .filter(n -> leaseholder.equals(n.name())) + .filter(filter::test) .findFirst() .get(); } + private IgniteImpl commitPartitionPrimaryNode(String leaseholder) { + return findNode(0, initialNodes(), n -> leaseholder.equals(n.name())); + } + private IgniteImpl nonPrimaryNode(String leaseholder) { - return IntStream.range(1, initialNodes()) - .mapToObj(this::node) - .filter(n -> !leaseholder.equals(n.name())) - .findFirst() - .get(); + return findNode(1, initialNodes(), n -> !leaseholder.equals(n.name())); + } + + private String waitAndGetLeaseholder(IgniteImpl node, ReplicationGroupId tblReplicationGrp) throws InterruptedException { + CompletableFuture<ReplicaMeta> primaryReplicaFut = node.placementDriver().awaitPrimaryReplica( + tblReplicationGrp, + node.clock().now(), + 10, + SECONDS + ); + + assertThat(primaryReplicaFut, willCompleteSuccessfully()); + + return primaryReplicaFut.join().getLeaseholder(); + } + + private void cancelLease(IgniteImpl leaseholder, ReplicationGroupId groupId) { + StopLeaseProlongationMessage msg = PLACEMENT_DRIVER_MESSAGES_FACTORY + .stopLeaseProlongationMessage() + .groupId(groupId) + .build(); + + // Just sent it to all nodes to not determine the exact placement driver active actor. + runningNodes().forEach(node -> leaseholder.sendFakeMessage(node.name(), msg)); } } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java index 5499863538..b3505d2d21 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -27,6 +27,7 @@ import static org.hamcrest.Matchers.contains; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -429,7 +430,13 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { }); var err = assertThrows(CompletionException.class, fut0::join); - assertEquals(IllegalArgumentException.class, err.getCause().getClass()); + + try { + assertInstanceOf(IllegalArgumentException.class, err.getCause()); + } catch (AssertionError e) { + throw new AssertionError("Unexpected exception type", err); + } + assertEquals(balance, view.get(null, makeKey(1)).doubleValue("balance")); } @@ -449,7 +456,12 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { }); var err = assertThrows(CompletionException.class, fut0::join); - assertEquals(NullPointerException.class, err.getCause().getClass()); + + try { + assertInstanceOf(NullPointerException.class, err.getCause()); + } catch (AssertionError e) { + throw new AssertionError("Unexpected exception type", err); + } } @Test diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAlreadyFinishedException.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAlreadyFinishedException.java index 2121aafa3d..6d14d467fa 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAlreadyFinishedException.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAlreadyFinishedException.java @@ -31,11 +31,15 @@ public class TransactionAlreadyFinishedException extends TransactionException { /** Stored transaction result. */ private final TransactionResult transactionResult; - public TransactionAlreadyFinishedException(String message, TransactionResult transactionResult) { - super(TX_UNEXPECTED_STATE_ERR, message); + public TransactionAlreadyFinishedException(int errorCode, String message, TransactionResult transactionResult, Throwable cause) { + super(errorCode, message, cause); this.transactionResult = transactionResult; } + public TransactionAlreadyFinishedException(String message, TransactionResult transactionResult) { + this(TX_UNEXPECTED_STATE_ERR, message, transactionResult, null); + } + public TransactionResult transactionResult() { return transactionResult; } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java index a684f6fc89..fdbbaffdfb 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java @@ -38,7 +38,7 @@ public class PrimaryReplicaExpiredException extends IgniteInternalException { public PrimaryReplicaExpiredException( ReplicationGroupId groupId, Long expectedEnlistmentConsistencyToken, - HybridTimestamp commitTimestamp, + @Nullable HybridTimestamp commitTimestamp, @Nullable ReplicaMeta currentPrimaryReplica ) { super( diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index cd8c65eb67..e40523e56d 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.tx.impl; import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.CompletableFuture.runAsync; import static java.util.concurrent.CompletableFuture.supplyAsync; import static org.apache.ignite.internal.tx.TxState.ABORTED; @@ -25,10 +26,12 @@ import static org.apache.ignite.internal.tx.TxState.COMMITTED; import static org.apache.ignite.internal.tx.TxState.FINISHING; import static org.apache.ignite.internal.tx.TxState.PENDING; import static org.apache.ignite.internal.tx.TxState.isFinalState; +import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_PRIMARY_REPLICA_EXPIRED_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR; import java.io.IOException; @@ -53,6 +56,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import org.apache.ignite.internal.event.EventListener; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; @@ -60,10 +64,14 @@ import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent; +import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException; +import org.apache.ignite.internal.replicator.exception.ReplicationException; import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException; import org.apache.ignite.internal.replicator.message.ErrorReplicaResponse; import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup; @@ -174,6 +182,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { */ private final TxMessageSender txMessageSender; + private final EventListener<PrimaryReplicaEventParameters> primaryReplicaEventListener; + /** * The constructor. * @@ -204,6 +214,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { this.idleSafeTimePropagationPeriodMsSupplier = idleSafeTimePropagationPeriodMsSupplier; this.topologyService = clusterService.topologyService(); this.messagingService = clusterService.messagingService(); + this.primaryReplicaEventListener = this::primaryReplicaEventListener; placementDriverHelper = new PlacementDriverHelper(placementDriver, clock); @@ -229,6 +240,30 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { txCleanupRequestSender = new TxCleanupRequestSender(txMessageSender, placementDriverHelper, writeIntentSwitchProcessor); } + private CompletableFuture<Boolean> primaryReplicaEventListener(PrimaryReplicaEventParameters eventParameters, Throwable err) { + return inBusyLock(busyLock, () -> { + if (!(eventParameters.groupId() instanceof TablePartitionId)) { + return falseCompletedFuture(); + } + + TablePartitionId groupId = (TablePartitionId) eventParameters.groupId(); + + for (Map.Entry<UUID, TxContext> ctxEntry : txCtxMap.entrySet()) { + TxContext txContext = ctxEntry.getValue(); + + if (txContext.isTxFinishing()) { + Long enlistmentConsistencyToken = txContext.enlistedGroups.get(groupId); + + if (enlistmentConsistencyToken != null) { + txContext.cancelWaitingInflights(groupId, enlistmentConsistencyToken); + } + } + } + + return falseCompletedFuture(); + }); + } + @Override public InternalTransaction begin(HybridTimestampTracker timestampTracker) { return begin(timestampTracker, false, TxPriority.NORMAL); @@ -330,17 +365,19 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { public CompletableFuture<Void> finish( HybridTimestampTracker observableTimestampTracker, TablePartitionId commitPartition, - boolean commit, + boolean commitIntent, Map<TablePartitionId, Long> enlistedGroups, UUID txId ) { - LOG.debug("Finish [commit={}, txId={}, groups={}].", commit, txId, enlistedGroups); + LOG.debug("Finish [commit={}, txId={}, groups={}].", commitIntent, txId, enlistedGroups); assert enlistedGroups != null; if (enlistedGroups.isEmpty()) { // If there are no enlisted groups, just update local state - we already marked the tx as finished. - updateTxMeta(txId, old -> new TxStateMeta(commit ? COMMITTED : ABORTED, localNodeId, commitPartition, commitTimestamp(commit))); + updateTxMeta(txId, old -> new TxStateMeta( + commitIntent ? COMMITTED : ABORTED, localNodeId, commitPartition, commitTimestamp(commitIntent) + )); return nullCompletedFuture(); } @@ -367,27 +404,17 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { // If the state is FINISHING then someone else hase in in the middle of finishing this tx. if (stateMeta.txState() == FINISHING) { return ((TxStateMetaFinishing) stateMeta).txFinishFuture() - .thenCompose(meta -> checkTxOutcome(commit, txId, meta)); + .thenCompose(meta -> checkTxOutcome(commitIntent, txId, meta)); } else { // The TX has already been finished. Check whether it finished with the same outcome. - return checkTxOutcome(commit, txId, stateMeta); + return checkTxOutcome(commitIntent, txId, stateMeta); } } - TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> { - if (tuple0 == null) { - tuple0 = new TxContext(); // No writes enlisted. - } - - assert !tuple0.isTxFinishing() : "Transaction is already finished [id=" + uuid + "]."; - - tuple0.finishTx(); - - return tuple0; - }); + TxContext txContext = lockTxForNewUpdates(txId, enlistedGroups, commitIntent); // Wait for commit acks first, then proceed with the finish request. - return tuple.performFinish(commit, ignored -> + return txContext.performFinish(commitIntent, commit -> prepareFinish( observableTimestampTracker, commitPartition, @@ -398,12 +425,26 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { )); } + private TxContext lockTxForNewUpdates(UUID txId, Map<TablePartitionId, Long> enlistedGroups, boolean commitIntent) { + return txCtxMap.compute(txId, (uuid, tuple0) -> { + if (tuple0 == null) { + tuple0 = new TxContext(placementDriver); // No writes enlisted. + } + + assert !tuple0.isTxFinishing() : "Transaction is already finished [id=" + uuid + "]."; + + tuple0.finishTx(enlistedGroups); + + return tuple0; + }); + } + private static CompletableFuture<Void> checkTxOutcome(boolean commit, UUID txId, TransactionMeta stateMeta) { if ((stateMeta.txState() == COMMITTED) == commit) { return nullCompletedFuture(); } - return CompletableFuture.failedFuture(new TransactionAlreadyFinishedException( + return failedFuture(new TransactionAlreadyFinishedException( "Failed to change the outcome of a finished transaction [txId=" + txId + ", txState=" + stateMeta.txState() + "].", new TransactionResult(stateMeta.txState(), stateMeta.commitTimestamp())) ); @@ -440,7 +481,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { txFinishFuture); }) .thenCompose(Function.identity()) - // verification future is added in order to share proper exception with the client + // Verification future is added in order to share the proper verification exception with the client. .thenCompose(r -> verificationFuture); } @@ -595,6 +636,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { txCleanupRequestHandler.start(); + placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, primaryReplicaEventListener); + return nullCompletedFuture(); } @@ -614,6 +657,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { txCleanupRequestHandler.stop(); + placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, primaryReplicaEventListener); + shutdownAndAwaitTermination(cleanupExecutor, 10, TimeUnit.SECONDS); } @@ -683,7 +728,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { txCtxMap.compute(txId, (uuid, tuple) -> { if (tuple == null) { - tuple = new TxContext(); + tuple = new TxContext(placementDriver); } if (tuple.isTxFinishing()) { @@ -727,9 +772,9 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { } // Process directly sent response. - ReplicaResponse request = (ReplicaResponse) message; + ReplicaResponse response = (ReplicaResponse) message; - Object result = request.result(); + Object result = response.result(); if (result instanceof UUID) { removeInflight((UUID) result); @@ -769,7 +814,6 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { "Commit timestamp is greater than primary replica expiration timestamp:" + " [groupId = {}, commit timestamp = {}, primary replica expiration timestamp = {}]", groupId, commitTimestamp, currentPrimaryReplica.getExpirationTime()); - } }); } @@ -780,25 +824,63 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { private static class TxContext { volatile long inflights = 0; // Updated under lock. private final CompletableFuture<Void> waitRepFut = new CompletableFuture<>(); + private final PlacementDriver placementDriver; volatile CompletableFuture<Void> finishInProgressFuture = null; + volatile Map<TablePartitionId, Long> enlistedGroups; + + private TxContext(PlacementDriver placementDriver) { + this.placementDriver = placementDriver; + } - CompletableFuture<Void> performFinish(boolean commit, Function<Void, CompletableFuture<Void>> finishAction) { + CompletableFuture<Void> performFinish(boolean commit, Function<Boolean, CompletableFuture<Void>> finishAction) { waitReadyToFinish(commit) - .thenCompose(finishAction) - .handle((ignored, err) -> { - if (err == null) { - finishInProgressFuture.complete(null); - } else { - finishInProgressFuture.completeExceptionally(err); - } - return null; - }); + .whenComplete((ignoredReadyToFinish, readyException) -> finishAction.apply(commit && readyException == null) + .whenComplete((ignoredFinishActionResult, finishException) -> + completeFinishInProgressFuture(commit, readyException, finishException)) + ); return finishInProgressFuture; } + private void completeFinishInProgressFuture( + boolean commit, + @Nullable Throwable readyToFinishException, + @Nullable Throwable finishException + ) { + if (readyToFinishException == null) { + if (finishException == null) { + finishInProgressFuture.complete(null); + } else { + finishInProgressFuture.completeExceptionally(finishException); + } + } else { + if (commit && readyToFinishException instanceof PrimaryReplicaExpiredException) { + finishInProgressFuture.completeExceptionally(new TransactionAlreadyFinishedException( + TX_PRIMARY_REPLICA_EXPIRED_ERR, + "Failed to commit the transaction.", + new TransactionResult(ABORTED, null), + readyToFinishException + )); + } else { + finishInProgressFuture.completeExceptionally(readyToFinishException); + } + } + } + private CompletableFuture<Void> waitReadyToFinish(boolean commit) { - return commit ? waitNoInflights() : nullCompletedFuture(); + if (commit) { + for (Map.Entry<TablePartitionId, Long> e : enlistedGroups.entrySet()) { + ReplicaMeta replicaMeta = placementDriver.currentLease(e.getKey()); + + if (replicaMeta == null || !e.getValue().equals(replicaMeta.getStartTime().longValue())) { + return failedFuture(new PrimaryReplicaExpiredException(e.getKey(), e.getValue(), null, replicaMeta)); + } + } + + return waitNoInflights(); + } else { + return nullCompletedFuture(); + } } private CompletableFuture<Void> waitNoInflights() { @@ -808,13 +890,18 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { return waitRepFut; } + private void cancelWaitingInflights(TablePartitionId groupId, Long enlistmentConsistencyToken) { + waitRepFut.completeExceptionally(new PrimaryReplicaExpiredException(groupId, enlistmentConsistencyToken, null, null)); + } + void onRemovedInflights() { if (inflights == 0 && finishInProgressFuture != null) { waitRepFut.complete(null); } } - void finishTx() { + void finishTx(Map<TablePartitionId, Long> enlistedGroups) { + this.enlistedGroups = enlistedGroups; finishInProgressFuture = new CompletableFuture<>(); } @@ -832,6 +919,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { private static final Set<Class<? extends Throwable>> RECOVERABLE = Set.of( TimeoutException.class, IOException.class, + ReplicationException.class, ReplicationTimeoutException.class, PrimaryReplicaMissException.class ); diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java index f32ef4e0a6..dc66dbec6c 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java @@ -288,6 +288,23 @@ public abstract class AbstractDeadlockPreventionTest extends AbstractLockingTest assertThat(futTx1, willSucceedFast()); } + @Test + public void testIncompatibleLockRetry() { + var tx1 = beginTx(); + var tx2 = beginTx(); + + var k = key("test"); + + assertThat(slock(tx1, k), willSucceedFast()); + assertThat(slock(tx2, k), willSucceedFast()); + + assertFutureFailsOrWaitsForTimeout(() -> xlock(tx2, k)); + + commitTx(tx1); + + assertThat(xlock(tx2, k), willSucceedFast()); + } + /** * This method checks lock future of conflicting transaction provided by supplier, in a way depending on deadlock prevention policy. * If the policy does not allow wait on conflict (wait timeout is equal to {@code 0}) then the future must be failed with diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java index 1614346449..287dde26cf 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java @@ -360,6 +360,8 @@ public class TxManagerTest extends IgniteAbstractTest { // Same primary that was enlisted is returned during finish phase and commitTimestamp is less that primary.expirationTimestamp. when(placementDriver.getPrimaryReplica(any(), any())).thenReturn(completedFuture( new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE))); + when(placementDriver.currentLease(any())).thenReturn( + new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE)); when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), any())).thenReturn(completedFuture( new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE))); @@ -383,6 +385,8 @@ public class TxManagerTest extends IgniteAbstractTest { .thenReturn( completedFuture(new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10))) ); + when(placementDriver.currentLease(any())) + .thenReturn(new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10))); when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), any())) .thenReturn( completedFuture(new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10))), @@ -420,7 +424,7 @@ public class TxManagerTest extends IgniteAbstractTest { @Test public void testFinishExpiredWithNullPrimary() { // Null is returned as primaryReplica during finish phase. - when(placementDriver.getPrimaryReplica(any(), any())).thenReturn(nullCompletedFuture()); + when(placementDriver.currentLease(any())).thenReturn(null); when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), any())).thenReturn(completedFuture( new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10)))); when(replicaService.invoke(anyString(), any(TxFinishReplicaRequest.class))) @@ -434,7 +438,7 @@ public class TxManagerTest extends IgniteAbstractTest { @Test public void testExpiredExceptionDoesNotShadeResponseExceptions() { // Null is returned as primaryReplica during finish phase. - when(placementDriver.getPrimaryReplica(any(), any())).thenReturn(nullCompletedFuture()); + when(placementDriver.currentLease(any())).thenReturn(null); when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), any())).thenReturn(completedFuture( new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10)))); when(replicaService.invoke(anyString(), any(TxFinishReplicaRequest.class))) @@ -469,6 +473,8 @@ public class TxManagerTest extends IgniteAbstractTest { when(placementDriver.getPrimaryReplica(eq(tablePartitionId1), any())) .thenReturn(completedFuture( new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE))); + when(placementDriver.currentLease(eq(tablePartitionId1))) + .thenReturn(new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE)); when(placementDriver.awaitPrimaryReplica(eq(tablePartitionId1), any(), anyLong(), any())) .thenReturn(completedFuture( new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE))); @@ -499,6 +505,7 @@ public class TxManagerTest extends IgniteAbstractTest { // given test checks that an assertion exception will be thrown and wrapped with proper transaction public one. when(placementDriver.getPrimaryReplica(any(), any())).thenReturn(completedFuture( new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10)))); + when(placementDriver.currentLease(any())).thenReturn(new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10))); when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), any())).thenReturn(completedFuture( new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10)))); when(replicaService.invoke(anyString(), any(TxFinishReplicaRequest.class))) @@ -520,8 +527,8 @@ public class TxManagerTest extends IgniteAbstractTest { @Test public void testFinishExpiredWithDifferentEnlistmentConsistencyToken() { // Primary with another enlistment consistency token is returned. - when(placementDriver.getPrimaryReplica(any(), any())).thenReturn(completedFuture( - new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(2), HybridTimestamp.MAX_VALUE))); + when(placementDriver.currentLease(any())).thenReturn( + new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(2), HybridTimestamp.MAX_VALUE)); when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), any())).thenReturn(completedFuture( new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(2), HybridTimestamp.MAX_VALUE))); when(replicaService.invoke(anyString(), any(TxFinishReplicaRequest.class)))