This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 5d6b90d309 IGNITE-20874 Node cleanup procedure (#2877) 5d6b90d309 is described below commit 5d6b90d309bbc7f79ff7f43863507c8d8c3042de Author: Cyrill <cyrill.si...@gmail.com> AuthorDate: Fri Dec 8 20:24:13 2023 +0300 IGNITE-20874 Node cleanup procedure (#2877) --- .../apache/ignite/client/fakes/FakeTxManager.java | 10 +- .../ignite/internal/table/ItDurableFinishTest.java | 4 +- .../ItTxDistributedCleanupRecoveryTest.java | 144 ++----------------- ...xDistributedTestSingleNodeNoCleanupMessage.java | 6 +- .../table/distributed/StorageUpdateHandler.java | 12 +- .../table/distributed/TableMessageGroup.java | 6 +- ...pCommand.java => WriteIntentSwitchCommand.java} | 4 +- .../table/distributed/raft/PartitionListener.java | 12 +- .../replicator/PartitionReplicaListener.java | 145 +++++-------------- .../table/distributed/IndexCleanupTest.java | 12 +- .../table/distributed/StorageCleanupTest.java | 38 ++--- .../PartitionRaftCommandsSerializationTest.java | 20 +-- .../raft/PartitionCommandListenerTest.java | 20 +-- .../PartitionReplicaListenerDurableUnlockTest.java | 47 ++---- .../replication/PartitionReplicaListenerTest.java | 16 +-- .../table/impl/DummyInternalTableImpl.java | 65 ++++++++- .../org/apache/ignite/internal/tx/TxManager.java | 17 ++- .../internal/tx/impl/PlacementDriverHelper.java | 148 +++++++++++++++++++ .../internal/tx/impl/TxCleanupRequestHandler.java | 158 +++++++++++++++++++++ .../internal/tx/impl/TxCleanupRequestSender.java | 151 ++++++++++++++++++++ .../ignite/internal/tx/impl/TxManagerImpl.java | 114 ++++++--------- .../ignite/internal/tx/impl/TxMessageSender.java | 158 +++++++++++++++++++++ .../tx/impl/WriteIntentSwitchProcessor.java | 121 ++++++++++++++++ ...upReplicaRequest.java => TxCleanupMessage.java} | 27 ++-- .../tx/message/TxCleanupMessageErrorResponse.java | 35 +++++ .../tx/message/TxCleanupMessageResponse.java | 28 ++++ .../ignite/internal/tx/message/TxMessageGroup.java | 19 ++- ...t.java => WriteIntentSwitchReplicaRequest.java} | 12 +- modules/transactions/tech-notes/cleanup.puml | 51 +++++++ 29 files changed, 1147 insertions(+), 453 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java index 43ff6feaa9..dce43dd8d2 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.client.fakes; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import java.util.Collection; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -176,16 +177,15 @@ public class FakeTxManager implements TxManager { Map<TablePartitionId, Long> enlistedGroups, UUID txId ) { - return null; + return nullCompletedFuture(); } @Override public CompletableFuture<Void> cleanup( - String primaryConsistentId, - TablePartitionId tablePartitionId, - UUID txId, + Collection<TablePartitionId> partitions, boolean commit, - @Nullable HybridTimestamp commitTimestamp + @Nullable HybridTimestamp commitTimestamp, + UUID txId ) { return nullCompletedFuture(); } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java index c55f46f26e..ab22a7f004 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java @@ -40,8 +40,8 @@ import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.TxMeta; -import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest; import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; +import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest; import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.DefaultMessagingService; @@ -229,7 +229,7 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { // Make sure the finish message is prepared, i.e. the outcome, commit timestamp, primary node, etc. have been set, // and then temporarily block the messaging to simulate network issues. primaryMessaging.dropMessages((s, networkMessage) -> { - if (networkMessage instanceof TxCleanupReplicaRequest && dropMessage.get()) { + if (networkMessage instanceof WriteIntentSwitchReplicaRequest && dropMessage.get()) { logger().info("Dropping message: {}.", networkMessage); return true; diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java index 4f102c2eca..469581cd52 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java @@ -17,40 +17,10 @@ package org.apache.ignite.distributed; -import static java.util.concurrent.CompletableFuture.failedFuture; -import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; -import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.hlc.HybridClock; -import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.placementdriver.PlacementDriver; -import org.apache.ignite.internal.raft.service.RaftGroupService; -import org.apache.ignite.internal.replicator.ReplicaResult; -import org.apache.ignite.internal.replicator.exception.ReplicationException; -import org.apache.ignite.internal.replicator.message.ReplicaRequest; -import org.apache.ignite.internal.storage.MvPartitionStorage; -import org.apache.ignite.internal.table.distributed.IndexLocker; -import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; -import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage; -import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; -import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver; -import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService; -import org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource; -import org.apache.ignite.internal.tx.TxManager; -import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest; -import org.apache.ignite.internal.tx.storage.state.TxStateStorage; -import org.apache.ignite.internal.util.Lazy; -import org.apache.ignite.internal.util.PendingComparableValuesTracker; -import org.apache.ignite.network.ClusterNode; -import org.apache.ignite.tx.TransactionException; +import org.apache.ignite.internal.tx.message.TxCleanupMessage; +import org.apache.ignite.network.DefaultMessagingService; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; /** @@ -88,66 +58,7 @@ public class ItTxDistributedCleanupRecoveryTest extends ItTxDistributedTestSingl replicas(), startClient(), timestampTracker - ) { - - @Override - protected PartitionReplicaListener newReplicaListener( - MvPartitionStorage mvDataStorage, - RaftGroupService raftClient, - TxManager txManager, - Executor scanRequestExecutor, - int partId, - int tableId, - Supplier<Map<Integer, IndexLocker>> indexesLockers, - Lazy<TableSchemaAwareIndexStorage> pkIndexStorage, - Supplier<Map<Integer, TableSchemaAwareIndexStorage>> secondaryIndexStorages, - HybridClock hybridClock, - PendingComparableValuesTracker<HybridTimestamp, Void> safeTime, - TxStateStorage txStateStorage, - TransactionStateResolver transactionStateResolver, - StorageUpdateHandler storageUpdateHandler, - ValidationSchemasSource validationSchemasSource, - ClusterNode localNode, - SchemaSyncService schemaSyncService, - CatalogService catalogService, - PlacementDriver placementDriver - ) { - return new PartitionReplicaListener( - mvDataStorage, - raftClient, - txManager, - txManager.lockManager(), - Runnable::run, - partId, - tableId, - indexesLockers, - pkIndexStorage, - secondaryIndexStorages, - hybridClock, - safeTime, - txStateStorage, - transactionStateResolver, - storageUpdateHandler, - validationSchemasSource, - localNode, - schemaSyncService, - catalogService, - placementDriver - ) { - @Override - public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String senderId) { - if (request instanceof TxCleanupReplicaRequest && defaultRetryCount.getAndDecrement() > 0) { - logger().info("Dropping cleanup request: {}", request); - - return failedFuture(new ReplicationException( - REPLICA_COMMON_ERR, - "Test Tx Cleanup exception [replicaGroupId=" + request.groupId() + ']')); - } - return super.invoke(request, senderId); - } - }; - } - }; + ); txTestCluster.prepareCluster(); @@ -156,47 +67,18 @@ public class ItTxDistributedCleanupRecoveryTest extends ItTxDistributedTestSingl accounts = txTestCluster.startTable(ACC_TABLE_NAME, ACC_TABLE_ID, ACCOUNTS_SCHEMA); customers = txTestCluster.startTable(CUST_TABLE_NAME, CUST_TABLE_ID, CUSTOMERS_SCHEMA); - log.info("Tables have been started"); - } - - - @Test - @Override - public void testDeleteUpsertCommit() throws TransactionException { - // The value of 6 is higher than the default retry count. - // So we should give up retrying and crash. - setDefaultRetryCount(6); - - assertThrows(TransactionException.class, () -> deleteUpsert().commit()); - } + txTestCluster.cluster.forEach(clusterService -> { + DefaultMessagingService messagingService = (DefaultMessagingService) clusterService.messagingService(); + messagingService.dropMessages((s, networkMessage) -> { + if (networkMessage instanceof TxCleanupMessage && defaultRetryCount.getAndDecrement() > 0) { + logger().info("Dropping cleanup request: {}", networkMessage); - @Test - @Override - public void testTransactionAlreadyRolledback() { - // The value of 6 is higher than the default retry count. - // So we should give up retrying and crash. - setDefaultRetryCount(6); - - // Do not check the locks since we have intentionally dropped the cleanup request thus the locks are not released yet. - testTransactionAlreadyFinished(false, false, (transaction, txId) -> { - assertThrows(TransactionException.class, transaction::rollback); - - log.info("Rolled back transaction {}", txId); + return true; + } + return false; + }); }); - } - @Test - @Override - public void testTransactionAlreadyCommitted() { - // The value of 6 is higher than the default retry count. - // So we should give up retrying and crash. - setDefaultRetryCount(6); - - // Do not check the locks since we have intentionally dropped the cleanup request thus the locks are not released yet. - testTransactionAlreadyFinished(true, false, (transaction, txId) -> { - assertThrows(TransactionException.class, transaction::commit); - - log.info("Committed transaction {}", txId); - }); + log.info("Tables have been started"); } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java index e0fcc50880..97201815c5 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java @@ -56,7 +56,7 @@ import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; import org.apache.ignite.internal.tx.impl.TxManagerImpl; -import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest; +import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest; import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.PendingComparableValuesTracker; @@ -176,11 +176,11 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut ) { @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String senderId) { - if (request instanceof TxCleanupReplicaRequest) { + if (request instanceof WriteIntentSwitchReplicaRequest) { logger().info("Dropping cleanup request: {}", request); releaseTxLocks( - ((TxCleanupReplicaRequest) request).txId(), + ((WriteIntentSwitchReplicaRequest) request).txId(), txManager.lockManager() ); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java index 9d5050e94b..ca678a9ca1 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java @@ -284,25 +284,27 @@ public class StorageUpdateHandler { } /** - * Handles the cleanup of a transaction. The transaction is either committed or rolled back. + * Switches write intents created by the transaction to regular values if the transaction is committed + * or removes them if the transaction is aborted. * * @param txId Transaction id. * @param commit Commit flag. {@code true} if transaction is committed, {@code false} otherwise. * @param commitTimestamp Commit timestamp. Not {@code null} if {@code commit} is {@code true}. */ - public void handleTransactionCleanup(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp) { - handleTransactionCleanup(txId, commit, commitTimestamp, null); + public void switchWriteIntents(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp) { + switchWriteIntents(txId, commit, commitTimestamp, null); } /** - * Handles the cleanup of a transaction. The transaction is either committed or rolled back. + * Switches write intents created by the transaction to regular values if the transaction is committed + * or removes them if the transaction is aborted. * * @param txId Transaction id. * @param commit Commit flag. {@code true} if transaction is committed, {@code false} otherwise. * @param commitTimestamp Commit timestamp. Not {@code null} if {@code commit} is {@code true}. * @param onApplication On application callback. */ - public void handleTransactionCleanup( + public void switchWriteIntents( UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp, diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java index 9e697fa2d5..56eeaa3e8d 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java @@ -23,9 +23,9 @@ import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand; import org.apache.ignite.internal.table.distributed.command.FinishTxCommand; import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage; import org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage; -import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand; import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; +import org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand; import org.apache.ignite.internal.table.distributed.message.HasDataRequest; import org.apache.ignite.internal.table.distributed.message.HasDataResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest; @@ -192,8 +192,8 @@ public interface TableMessageGroup { /** Message type for {@link FinishTxCommand}. */ short FINISH_TX = 40; - /** Message type for {@link TxCleanupCommand}. */ - short TX_CLEANUP = 41; + /** Message type for {@link WriteIntentSwitchCommand}. */ + short WRITE_INTENT_SWITCH = 41; /** Message type for {@link UpdateAllCommand}. */ short UPDATE_ALL = 42; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/WriteIntentSwitchCommand.java similarity index 92% rename from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java rename to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/WriteIntentSwitchCommand.java index efed17c540..c51610e307 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/WriteIntentSwitchCommand.java @@ -27,8 +27,8 @@ import org.jetbrains.annotations.Nullable; /** * State machine command to cleanup on a transaction commit. */ -@Transferable(TableMessageGroup.Commands.TX_CLEANUP) -public interface TxCleanupCommand extends PartitionCommand { +@Transferable(TableMessageGroup.Commands.WRITE_INTENT_SWITCH) +public interface WriteIntentSwitchCommand extends PartitionCommand { /** * Returns a commit or a rollback state. */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 38e23f85d9..e516799cc2 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -62,9 +62,9 @@ import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand; import org.apache.ignite.internal.table.distributed.command.FinishTxCommand; import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage; -import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand; import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; +import org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.TxMeta; import org.apache.ignite.internal.tx.TxState; @@ -199,8 +199,8 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler handleUpdateAllCommand((UpdateAllCommand) command, commandIndex, commandTerm); } else if (command instanceof FinishTxCommand) { handleFinishTxCommand((FinishTxCommand) command, commandIndex, commandTerm); - } else if (command instanceof TxCleanupCommand) { - handleTxCleanupCommand((TxCleanupCommand) command, commandIndex, commandTerm); + } else if (command instanceof WriteIntentSwitchCommand) { + handleWriteIntentSwitchCommand((WriteIntentSwitchCommand) command, commandIndex, commandTerm); } else if (command instanceof SafeTimeSyncCommand) { handleSafeTimeSyncCommand((SafeTimeSyncCommand) command, commandIndex, commandTerm); } else if (command instanceof BuildIndexCommand) { @@ -368,13 +368,13 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler /** - * Handler for the {@link TxCleanupCommand}. + * Handler for the {@link WriteIntentSwitchCommand}. * * @param cmd Command. * @param commandIndex Index of the RAFT command. * @param commandTerm Term of the RAFT command. */ - private void handleTxCleanupCommand(TxCleanupCommand cmd, long commandIndex, long commandTerm) { + private void handleWriteIntentSwitchCommand(WriteIntentSwitchCommand cmd, long commandIndex, long commandTerm) { // Skips the write command because the storage has already executed it. if (commandIndex <= storage.lastAppliedIndex()) { return; @@ -384,7 +384,7 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler markFinished(txId, cmd.commit(), cmd.commitTimestamp(), cmd.txCoordinatorId()); - storageUpdateHandler.handleTransactionCleanup(txId, cmd.commit(), cmd.commitTimestamp(), + storageUpdateHandler.switchWriteIntents(txId, cmd.commit(), cmd.commitTimestamp(), () -> storage.lastApplied(commandIndex, commandTerm)); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 0628f26bd6..232bce8a49 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -21,7 +21,6 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; -import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; @@ -36,12 +35,10 @@ import static org.apache.ignite.internal.util.CompletableFutures.emptyCollection import static org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; -import static org.apache.ignite.internal.util.ExceptionUtils.withCause; import static org.apache.ignite.internal.util.IgniteUtils.findAny; import static org.apache.ignite.internal.util.IgniteUtils.findFirst; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; -import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_WAS_ABORTED_ERR; import static org.apache.ignite.raft.jraft.util.internal.ThrowUtil.hasCause; @@ -65,7 +62,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.ignite.internal.binarytuple.BinaryTupleCommon; @@ -124,10 +120,10 @@ import org.apache.ignite.internal.table.distributed.command.FinishTxCommandBuild import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage; import org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage; import org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessageBuilder; -import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand; import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommandBuilder; +import org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand; import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage; import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest; @@ -160,10 +156,10 @@ import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.TxMeta; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.TxStateMeta; -import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest; import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; import org.apache.ignite.internal.tx.message.TxRecoveryMessage; import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest; +import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest; import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.internal.util.Cursor; @@ -184,10 +180,6 @@ public class PartitionReplicaListener implements ReplicaListener { /** Logger. */ private static final IgniteLogger LOG = Loggers.forClass(PartitionReplicaListener.class); - private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10; - - private static final int ATTEMPTS_TO_CLEANUP_REPLICA = 5; - /** Factory to create RAFT command messages. */ private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory(); @@ -388,7 +380,13 @@ public class PartitionReplicaListener implements ReplicaListener { } private CompletableFuture<?> durableCleanup(UUID txId, TxMeta txMeta) { - return cleanup(txId, txMeta) + Collection<TablePartitionId> enlistedPartitions = txMeta.enlistedPartitions(); + + boolean commit = txMeta.txState() == COMMITED; + + HybridTimestamp commitTimestamp = txMeta.commitTimestamp(); + + return txManager.cleanup(enlistedPartitions, commit, commitTimestamp, txId) .handle((v, e) -> { if (e == null) { return txManager.executeCleanupAsync(() -> markLocksReleased( @@ -706,8 +704,8 @@ public class PartitionReplicaListener implements ReplicaListener { return nullCompletedFuture(); } else if (request instanceof TxFinishReplicaRequest) { return processTxFinishAction((TxFinishReplicaRequest) request, senderId); - } else if (request instanceof TxCleanupReplicaRequest) { - return processTxCleanupAction((TxCleanupReplicaRequest) request); + } else if (request instanceof WriteIntentSwitchReplicaRequest) { + return processWriteIntentSwitchAction((WriteIntentSwitchReplicaRequest) request); } else if (request instanceof ReadOnlySingleRowPkReplicaRequest) { return processReadOnlySingleEntryAction((ReadOnlySingleRowPkReplicaRequest) request, isPrimary); } else if (request instanceof ReadOnlyMultiRowPkReplicaRequest) { @@ -1534,83 +1532,17 @@ public class PartitionReplicaListener implements ReplicaListener { } } - CompletableFuture<?> changeStateFuture = finishTransaction(enlistedPartitions, txId, commit, commitTimestamp, txCoordinatorId); - - return cleanup(changeStateFuture, enlistedPartitions, commit, commitTimestamp, txId, ATTEMPTS_TO_CLEANUP_REPLICA) - .thenRun(() -> markLocksReleased( - txId, - enlistedPartitions, - commit ? COMMITED : ABORTED, - commitTimestamp) + return finishTransaction(enlistedPartitions, txId, commit, commitTimestamp, txCoordinatorId) + .thenCompose(v -> txManager.cleanup(enlistedPartitions, commit, commitTimestamp, txId)) + .thenRun(() -> + markLocksReleased( + txId, + enlistedPartitions, + commit ? COMMITED : ABORTED, + commitTimestamp) ); } - private CompletableFuture<Void> cleanup(UUID txId, TxMeta txMeta) { - return cleanup(nullCompletedFuture(), txMeta.enlistedPartitions(), txMeta.txState() == COMMITED, txMeta.commitTimestamp(), txId, 1); - } - - // TODO https://issues.apache.org/jira/browse/IGNITE-20681 remove attempts count. - private CompletableFuture<Void> cleanup( - CompletableFuture<?> changeStateFuture, - Collection<TablePartitionId> enlistedPartitions, - boolean commit, - @Nullable HybridTimestamp commitTimestamp, - UUID txId, - int attemptsToCleanupReplica - ) { - CompletableFuture<?>[] futures = enlistedPartitions.stream() - .map(partitionId -> changeStateFuture.thenCompose(ignored -> - // TODO: IGNITE-20874 Use the node cleanup procedure instead of the replication group cleanup one. - cleanupWithRetry(commit, commitTimestamp, txId, partitionId, attemptsToCleanupReplica))) - .toArray(size -> new CompletableFuture<?>[size]); - - return allOf(futures); - } - - private CompletableFuture<Void> cleanupWithRetry( - boolean commit, - @Nullable HybridTimestamp commitTimestamp, - UUID txId, - TablePartitionId partitionId, - int attempts) { - return findPrimaryReplica(partitionId, hybridClock.now()) - .thenCompose(leaseHolder -> - txManager.cleanup(leaseHolder, partitionId, txId, commit, commitTimestamp)) - .handle((res, ex) -> { - if (ex != null) { - if (attempts > 0) { - LOG.warn("Failed to perform cleanup on Tx. The operation will be retried [txId={}].", txId, ex); - } else { - LOG.warn("Failed to perform cleanup on Tx [txId={}].", txId, ex); - } - - if (attempts > 0) { - return cleanupWithRetry(commit, commitTimestamp, txId, partitionId, attempts - 1); - } - - return CompletableFuture.<Void>failedFuture(ex); - } - - return CompletableFutures.<Void>nullCompletedFuture(); - }) - .thenCompose(Function.identity()); - } - - private CompletableFuture<String> findPrimaryReplica(TablePartitionId partitionId, HybridTimestamp now) { - return placementDriver.awaitPrimaryReplica(partitionId, now, AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS) - .handle((primaryReplica, e) -> { - if (e != null) { - LOG.error("Failed to retrieve primary replica for partition {}", partitionId, e); - - throw withCause(TransactionException::new, REPLICA_UNAVAILABLE_ERR, - "Failed to get the primary replica" - + " [tablePartitionId=" + partitionId + ", awaitTimestamp=" + now + ']', e); - } - - return primaryReplica.getLeaseholder(); - }); - } - /** * Finishes a transaction. This operation is idempotent. * @@ -1694,7 +1626,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @return CompletableFuture of void. */ // TODO: need to properly handle primary replica changes https://issues.apache.org/jira/browse/IGNITE-17615 - private CompletableFuture<Void> processTxCleanupAction(TxCleanupReplicaRequest request) { + private CompletableFuture<Void> processWriteIntentSwitchAction(WriteIntentSwitchReplicaRequest request) { try { closeAllTransactionCursors(request.txId()); } catch (Exception e) { @@ -1711,22 +1643,19 @@ public class PartitionReplicaListener implements ReplicaListener { HybridTimestamp commandTimestamp = hybridClock.now(); return reliableCatalogVersionFor(commandTimestamp) - .thenCompose(catalogVersion -> - applyCleanupCommand( - request.txId(), - request.commit(), - request.commitTimestamp(), - request.commitTimestampLong(), - catalogVersion - )) - .thenApply(unused -> res); + .thenCompose(catalogVersion -> { + applyWriteIntentSwitchCommand( + request.txId(), + request.commit(), + request.commitTimestamp(), + request.commitTimestampLong(), + catalogVersion + ); + + return nullCompletedFuture(); + }); } else { - return completedFuture(res); - } - }) - .thenAccept(res -> { - if (res.hadUpdateFutures() || res.hadReadFutures()) { - releaseTxLocks(request.txId()); + return nullCompletedFuture(); } }); } @@ -1759,14 +1688,14 @@ public class PartitionReplicaListener implements ReplicaListener { .thenApply(v -> new FuturesCleanupResult(!txReadFutures.isEmpty(), !txUpdateFutures.isEmpty())); } - private CompletableFuture<Void> applyCleanupCommand( + private CompletableFuture<Void> applyWriteIntentSwitchCommand( UUID transactionId, boolean commit, HybridTimestamp commitTimestamp, long commitTimestampLong, int catalogVersion ) { - TxCleanupCommand txCleanupCmd = MSG_FACTORY.txCleanupCommand() + WriteIntentSwitchCommand wiSwitchCmd = MSG_FACTORY.writeIntentSwitchCommand() .txId(transactionId) .commit(commit) .commitTimestampLong(commitTimestampLong) @@ -1775,11 +1704,11 @@ public class PartitionReplicaListener implements ReplicaListener { .requiredCatalogVersion(catalogVersion) .build(); - storageUpdateHandler.handleTransactionCleanup(transactionId, commit, commitTimestamp); + storageUpdateHandler.switchWriteIntents(transactionId, commit, commitTimestamp); CompletableFuture<Object> resultFuture = new CompletableFuture<>(); - applyCmdWithRetryOnSafeTimeReorderException(txCleanupCmd, resultFuture); + applyCmdWithRetryOnSafeTimeReorderException(wiSwitchCmd, resultFuture); return resultFuture .exceptionally(e -> { @@ -3485,7 +3414,7 @@ public class PartitionReplicaListener implements ReplicaListener { // The cleanup for this row has already been triggered. For example, we are resolving a write intent for an RW transaction // and a concurrent RO transaction resolves the same row, hence computeIfAbsent. return txManager.executeCleanupAsync(() -> - inBusyLock(busyLock, () -> storageUpdateHandler.handleTransactionCleanup(txId, txState == COMMITED, commitTimestamp)) + inBusyLock(busyLock, () -> storageUpdateHandler.switchWriteIntents(txId, txState == COMMITED, commitTimestamp)) ).whenComplete((unused, e) -> { if (e != null) { LOG.warn("Failed to complete transaction cleanup command [txId=" + txId + ']', e); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java index 2fc1ad4f99..7e08c7c9ce 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java @@ -47,7 +47,7 @@ public class IndexCleanupTest extends IndexBaseTest { assertThat(sortedInnerStorage.allRowsIds(), contains(rowId)); assertThat(hashInnerStorage.allRowsIds(), contains(rowId)); - storageUpdateHandler.handleTransactionCleanup(getTxId(), false, null); + storageUpdateHandler.switchWriteIntents(getTxId(), false, null); assertEquals(0, storage.rowsCount()); assertTrue(pkInnerStorage.allRowsIds().isEmpty()); @@ -94,7 +94,7 @@ public class IndexCleanupTest extends IndexBaseTest { writer.addWrite(storageUpdateHandler, rowUuid, row); writer.addWrite(storageUpdateHandler, rowUuid, null); - storageUpdateHandler.handleTransactionCleanup(getTxId(), false, null); + storageUpdateHandler.switchWriteIntents(getTxId(), false, null); assertEquals(0, storage.rowsCount()); assertTrue(pkInnerStorage.allRowsIds().isEmpty()); @@ -119,7 +119,7 @@ public class IndexCleanupTest extends IndexBaseTest { writer.addWrite(storageUpdateHandler, rowUuid2, binaryRow(key2, value)); - storageUpdateHandler.handleTransactionCleanup(getTxId(), false, null); + storageUpdateHandler.switchWriteIntents(getTxId(), false, null); assertEquals(1, storage.rowsCount()); @@ -152,7 +152,7 @@ public class IndexCleanupTest extends IndexBaseTest { writer.addWrite(storageUpdateHandler, rowUuid, row2); - storageUpdateHandler.handleTransactionCleanup(getTxId(), false, null); + storageUpdateHandler.switchWriteIntents(getTxId(), false, null); assertEquals(1, storage.rowsCount()); @@ -173,7 +173,7 @@ public class IndexCleanupTest extends IndexBaseTest { writer.addWrite(storageUpdateHandler, rowUuid, null); - storageUpdateHandler.handleTransactionCleanup(getTxId(), false, null); + storageUpdateHandler.switchWriteIntents(getTxId(), false, null); assertEquals(1, storage.rowsCount()); assertThat(pkInnerStorage.allRowsIds(), contains(rowId)); @@ -194,7 +194,7 @@ public class IndexCleanupTest extends IndexBaseTest { writer.addWrite(storageUpdateHandler, rowUuid, row); - storageUpdateHandler.handleTransactionCleanup(getTxId(), false, null); + storageUpdateHandler.switchWriteIntents(getTxId(), false, null); assertEquals(1, storage.rowsCount()); assertThat(pkInnerStorage.allRowsIds(), contains(rowId)); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java index 79ad4870ac..a3c2e5ecf7 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java @@ -167,7 +167,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { assertEquals(3, storage.rowsCount()); - storageUpdateHandler.handleTransactionCleanup(txUuid, false, null); + storageUpdateHandler.switchWriteIntents(txUuid, false, null); assertEquals(0, storage.rowsCount()); } @@ -192,7 +192,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { // We have three writes to the storage. verify(storage, times(3)).addWrite(any(), any(), any(), anyInt(), anyInt()); - storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs); + storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs); assertEquals(3, storage.rowsCount()); // Those writes resulted in three commits. @@ -202,7 +202,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { clearInvocations(storage); // And run cleanup again for the same transaction. - storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs); + storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs); assertEquals(3, storage.rowsCount()); // And no invocation after, meaning idempotence of the cleanup. @@ -240,7 +240,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { // We have three writes to the storage. verify(storage, times(3)).addWrite(any(), any(), any(), anyInt(), anyInt()); - storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs); + storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs); assertEquals(3, storage.rowsCount()); // Those writes resulted in three commits. @@ -250,7 +250,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { clearInvocations(storage); // And run cleanup again for the same transaction. - storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs); + storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs); assertEquals(3, storage.rowsCount()); // And no invocation after, meaning idempotence of the cleanup. @@ -279,7 +279,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { verify(storage, times(2)).addWrite(any(), any(), any(), anyInt(), anyInt()); // And run cleanup again for the same transaction. - storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs); + storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs); ReadResult resultAfterDelete1 = storage.read(new RowId(partitionId.partitionId(), id1), HybridTimestamp.MAX_VALUE); assertEquals(row1, resultAfterDelete1.binaryRow()); @@ -315,7 +315,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { assertEquals(3, storage.rowsCount()); // Now run cleanup. - storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs); + storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs); // But the loss of the state results in no cleanup, and the entries are still write intents. verify(storage, never()).commitWrite(any(), any()); @@ -326,7 +326,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { storageUpdateHandler.handleWriteIntentRead(txUuid, new RowId(PARTITION_ID, row1Id)); // Run the cleanup. - storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs); + storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs); // Only the discovered write intent was committed, the other two are still write intents. verify(storage, times(1)).commitWrite(any(), any()); @@ -346,7 +346,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { clearInvocations(storage); // Run cleanup for the original transaction - storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs); + storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs); // Only those two entries will be affected. verify(storage, times(2)).commitWrite(any(), any()); @@ -384,7 +384,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { assertEquals(3, storage.rowsCount()); // Now run cleanup. - storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs); + storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs); // But the loss of the state results in no cleanup, and the entries are still write intents. verify(storage, never()).commitWrite(any(), any()); @@ -395,7 +395,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { storageUpdateHandler.handleWriteIntentRead(txUuid, new RowId(PARTITION_ID, row1Id)); // Run the cleanup. - storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs); + storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs); // Only the discovered write intent was committed, the other two are still write intents. verify(storage, times(1)).commitWrite(any(), any()); @@ -420,7 +420,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { clearInvocations(storage); // Run cleanup for the original transaction - storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs); + storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs); // Only those two entries will be affected. verify(storage, times(2)).commitWrite(any(), any()); @@ -461,7 +461,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { storageUpdateHandler.handleUpdate(committedTx, rowId, partitionId, row1, true, null, null, null); - storageUpdateHandler.handleTransactionCleanup(committedTx, true, commitTs); + storageUpdateHandler.switchWriteIntents(committedTx, true, commitTs); assertEquals(1, storage.rowsCount()); @@ -502,7 +502,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { storageUpdateHandler.handleUpdateAll(committedTx, rowsToUpdate, partitionId, true, null, null); - storageUpdateHandler.handleTransactionCleanup(committedTx, true, commitTs); + storageUpdateHandler.switchWriteIntents(committedTx, true, commitTs); assertEquals(1, storage.rowsCount()); @@ -712,7 +712,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { storageUpdateHandler.handleUpdate(committed1, rowId, partitionId, row1, true, null, null, null); - storageUpdateHandler.handleTransactionCleanup(committed1, true, commitTs); + storageUpdateHandler.switchWriteIntents(committed1, true, commitTs); assertEquals(1, storage.rowsCount()); @@ -768,7 +768,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { storageUpdateHandler.handleUpdateAll(committed1, rowsToUpdate, partitionId, true, null, null); - storageUpdateHandler.handleTransactionCleanup(committed1, true, commitTs); + storageUpdateHandler.switchWriteIntents(committed1, true, commitTs); assertEquals(1, storage.rowsCount()); @@ -828,7 +828,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { storageUpdateHandler.handleUpdate(committed1, rowId, partitionId, row1, true, null, null, null); - storageUpdateHandler.handleTransactionCleanup(committed1, true, commitTs); + storageUpdateHandler.switchWriteIntents(committed1, true, commitTs); assertEquals(1, storage.rowsCount()); @@ -886,7 +886,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { storageUpdateHandler.handleUpdateAll(committed1, rowsToUpdate, partitionId, true, null, null); - storageUpdateHandler.handleTransactionCleanup(committed1, true, commitTs); + storageUpdateHandler.switchWriteIntents(committed1, true, commitTs); assertEquals(1, storage.rowsCount()); @@ -948,7 +948,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { storageUpdateHandler.handleUpdate(committed1, rowId, partitionId, row1, true, null, null, null); - storageUpdateHandler.handleTransactionCleanup(committed1, true, commitTs); + storageUpdateHandler.switchWriteIntents(committed1, true, commitTs); assertEquals(1, storage.rowsCount()); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java index 8acabb11f3..ac7b8db3f8 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java @@ -196,14 +196,14 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { public void testTxCleanupCommand() throws Exception { HybridClock clock = new HybridClockImpl(); - TxCleanupCommand cmd = msgFactory.txCleanupCommand() + WriteIntentSwitchCommand cmd = msgFactory.writeIntentSwitchCommand() .txId(UUID.randomUUID()) .commit(true) .commitTimestampLong(clock.nowLong()) .txCoordinatorId(UUID.randomUUID().toString()) .build(); - TxCleanupCommand readCmd = copyCommand(cmd); + WriteIntentSwitchCommand readCmd = copyCommand(cmd); assertEquals(cmd.txId(), readCmd.txId()); assertEquals(cmd.commit(), readCmd.commit()); @@ -251,14 +251,14 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { .commitTimestampLong(finishTxCommand.commitTimestampLong()) .txCoordinatorId(finishTxCommand.txCoordinatorId()) .build(); - } else if (cmd instanceof TxCleanupCommand) { - TxCleanupCommand txCleanupCommand = (TxCleanupCommand) cmd; - - return (T) msgFactory.txCleanupCommand() - .txId(txCleanupCommand.txId()) - .commit(txCleanupCommand.commit()) - .commitTimestampLong(txCleanupCommand.commitTimestampLong()) - .txCoordinatorId(txCleanupCommand.txCoordinatorId()) + } else if (cmd instanceof WriteIntentSwitchCommand) { + WriteIntentSwitchCommand writeIntentSwitchCommand = (WriteIntentSwitchCommand) cmd; + + return (T) msgFactory.writeIntentSwitchCommand() + .txId(writeIntentSwitchCommand.txId()) + .commit(writeIntentSwitchCommand.commit()) + .commitTimestampLong(writeIntentSwitchCommand.commitTimestampLong()) + .txCoordinatorId(writeIntentSwitchCommand.txCoordinatorId()) .build(); } else if (cmd instanceof UpdateCommand) { UpdateCommand updateCommand = (UpdateCommand) cmd; diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java index 8d2302ecf2..e278eb241f 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java @@ -87,8 +87,8 @@ import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand; import org.apache.ignite.internal.table.distributed.command.FinishTxCommand; import org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage; -import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; +import org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; @@ -338,8 +338,8 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { UpdateCommand updateCommand = mock(UpdateCommand.class); when(updateCommand.safeTime()).thenAnswer(v -> hybridClock.now()); - TxCleanupCommand txCleanupCommand = mock(TxCleanupCommand.class); - when(txCleanupCommand.safeTime()).thenAnswer(v -> hybridClock.now()); + WriteIntentSwitchCommand writeIntentSwitchCommand = mock(WriteIntentSwitchCommand.class); + when(writeIntentSwitchCommand.safeTime()).thenAnswer(v -> hybridClock.now()); SafeTimeSyncCommand safeTimeSyncCommand = mock(SafeTimeSyncCommand.class); when(safeTimeSyncCommand.safeTime()).thenAnswer(v -> hybridClock.now()); @@ -351,7 +351,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { commandListener.onWrite(List.of( writeCommandCommandClosure(3, 1, updateCommand, commandClosureResultCaptor), writeCommandCommandClosure(10, 1, updateCommand, commandClosureResultCaptor), - writeCommandCommandClosure(4, 1, txCleanupCommand, commandClosureResultCaptor), + writeCommandCommandClosure(4, 1, writeIntentSwitchCommand, commandClosureResultCaptor), writeCommandCommandClosure(5, 1, safeTimeSyncCommand, commandClosureResultCaptor) ).iterator()); @@ -629,7 +629,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { .txCoordinatorId(UUID.randomUUID().toString()) .build()); - invokeBatchedCommand(msgFactory.txCleanupCommand() + invokeBatchedCommand(msgFactory.writeIntentSwitchCommand() .txId(txId) .commit(true) .commitTimestampLong(commitTimestamp.longValue()) @@ -672,7 +672,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { .txCoordinatorId(UUID.randomUUID().toString()) .build()); - invokeBatchedCommand(msgFactory.txCleanupCommand() + invokeBatchedCommand(msgFactory.writeIntentSwitchCommand() .txId(txId) .commit(true) .commitTimestampLong(commitTimestamp.longValue()) @@ -710,7 +710,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { .txCoordinatorId(UUID.randomUUID().toString()) .build()); - invokeBatchedCommand(msgFactory.txCleanupCommand() + invokeBatchedCommand(msgFactory.writeIntentSwitchCommand() .txId(txId) .commit(true) .commitTimestampLong(commitTimestamp.longValue()) @@ -759,7 +759,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { HybridTimestamp commitTimestamp = hybridClock.now(); - txIds.forEach(txId -> invokeBatchedCommand(msgFactory.txCleanupCommand() + txIds.forEach(txId -> invokeBatchedCommand(msgFactory.writeIntentSwitchCommand() .txId(txId) .commit(true) .commitTimestampLong(commitTimestamp.longValue()) @@ -802,7 +802,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { HybridTimestamp commitTimestamp = hybridClock.now(); - txIds.forEach(txId -> invokeBatchedCommand(msgFactory.txCleanupCommand() + txIds.forEach(txId -> invokeBatchedCommand(msgFactory.writeIntentSwitchCommand() .txId(txId) .commit(true) .commitTimestampLong(commitTimestamp.longValue()) @@ -879,7 +879,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { long commitTimestamp = hybridClock.nowLong(); txIds.forEach(txId -> invokeBatchedCommand( - msgFactory.txCleanupCommand() + msgFactory.writeIntentSwitchCommand() .txId(txId) .commit(true) .commitTimestampLong(commitTimestamp) diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java index 28fc8e54ea..8fc5b2a2eb 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.table.distributed.replication; +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn; import static org.apache.ignite.internal.tx.TxState.isFinalState; @@ -24,14 +26,13 @@ import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.UUID; @@ -44,7 +45,6 @@ import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; -import org.apache.ignite.internal.placementdriver.ReplicaMeta; import org.apache.ignite.internal.placementdriver.TestPlacementDriver; import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent; import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; @@ -115,10 +115,12 @@ public class PartitionReplicaListenerDurableUnlockTest extends IgniteAbstractTes }).when(txManager).executeCleanupAsync(any(Supplier.class)); doAnswer(invocation -> { - UUID txId = invocation.getArgument(2); - TablePartitionId partitionId = invocation.getArgument(1); - return cleanupCallback.apply(txId, partitionId); - }).when(txManager).cleanup(anyString(), any(), any(), anyBoolean(), any()); + UUID txId = invocation.getArgument(3); + Collection<TablePartitionId> partitions = invocation.getArgument(0); + return allOf(partitions.stream() + .map(partitionId -> cleanupCallback.apply(txId, partitionId)) + .toArray(size -> new CompletableFuture<?>[size])); + }).when(txManager).cleanup(any(), anyBoolean(), any(), any()); partitionReplicaListener = new PartitionReplicaListener( new TestMvPartitionStorage(PART_ID), @@ -187,7 +189,7 @@ public class PartitionReplicaListenerDurableUnlockTest extends IgniteAbstractTes assertThat(exceptionCounter.get(), greaterThan(0)); if (exceptionCounter.decrementAndGet() > 0) { - throw new RuntimeException("test exception"); + return failedFuture(new RuntimeException("test exception")); } return nullCompletedFuture(); @@ -201,33 +203,4 @@ public class PartitionReplicaListenerDurableUnlockTest extends IgniteAbstractTes assertEquals(0, exceptionCounter.get()); } - - @Test - public void testCantGetPrimaryReplicaForEnlistedPartition() throws InterruptedException { - UUID tx0 = TestTransactionIds.newTransactionId(); - TablePartitionId part0 = new TablePartitionId(TABLE_ID, PART_ID); - txStateStorage.put(tx0, new TxMeta(TxState.COMMITED, List.of(part0), null)); - - cleanupCallback = (tx, partId) -> nullCompletedFuture(); - - CompletableFuture<ReplicaMeta> primaryReplicaFuture = new CompletableFuture<>(); - placementDriver.setAwaitPrimaryReplicaFunction((groupId, timestamp) -> primaryReplicaFuture); - - PrimaryReplicaEventParameters parameters = new PrimaryReplicaEventParameters(0, part0, LOCAL_NODE.name(), clock.now()); - assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, parameters), willSucceedIn(1, SECONDS)); - - assertFalse(txStateStorage.get(tx0).locksReleased()); - - Thread primaryReplicaFutureCompleteThread = - new Thread(() -> primaryReplicaFuture.completeExceptionally(new RuntimeException("test exception"))); - primaryReplicaFutureCompleteThread.start(); - - assertFalse(txStateStorage.get(tx0).locksReleased()); - - placementDriver.setAwaitPrimaryReplicaFunction(null); - - primaryReplicaFutureCompleteThread.join(); - - assertTrue(txStateStorage.get(tx0).locksReleased()); - } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 7842f1d076..5a7cd8b007 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -127,9 +127,9 @@ import org.apache.ignite.internal.table.distributed.command.CatalogVersionAware; import org.apache.ignite.internal.table.distributed.command.FinishTxCommand; import org.apache.ignite.internal.table.distributed.command.PartitionCommand; import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage; -import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommandImpl; +import org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; @@ -228,10 +228,10 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { private final LockManager lockManager = new HeapLockManager(); private final Function<PartitionCommand, CompletableFuture<?>> defaultMockRaftFutureClosure = cmd -> { - if (cmd instanceof TxCleanupCommand) { + if (cmd instanceof WriteIntentSwitchCommand) { Set<RowId> rows = pendingRows.remove(cmd.txId()); - HybridTimestamp commitTimestamp = ((TxCleanupCommand) cmd).commitTimestamp(); + HybridTimestamp commitTimestamp = ((WriteIntentSwitchCommand) cmd).commitTimestamp(); assertNotNull(commitTimestamp); if (rows != null) { @@ -1303,7 +1303,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED, UUID.randomUUID().toString(), commitPartitionId, now)); CompletableFuture<?> replicaCleanupFut = partitionReplicaListener.invoke( - TX_MESSAGES_FACTORY.txCleanupReplicaRequest() + TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest() .groupId(grpId) .txId(txId) .commit(true) @@ -1467,7 +1467,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } private CompletableFuture<?> beginAndAbortTx() { - when(txManager.cleanup(any(), any(), any(), anyBoolean(), any())).thenReturn(nullCompletedFuture()); + when(txManager.cleanup(any(), anyBoolean(), any(), any())).thenReturn(nullCompletedFuture()); HybridTimestamp beginTimestamp = clock.now(); UUID txId = transactionIdFor(beginTimestamp); @@ -1529,7 +1529,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } private CompletableFuture<?> beginAndCommitTx() { - when(txManager.cleanup(any(), any(), any(), anyBoolean(), any())).thenReturn(nullCompletedFuture()); + when(txManager.cleanup(any(), anyBoolean(), any(), any())).thenReturn(nullCompletedFuture()); HybridTimestamp beginTimestamp = clock.now(); UUID txId = transactionIdFor(beginTimestamp); @@ -2247,7 +2247,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .thenReturn(List.of( tableSchema(CURRENT_SCHEMA_VERSION, List.of(nullableColumn("col"))) )); - when(txManager.cleanup(any(), any(), any(), anyBoolean(), any())).thenReturn(nullCompletedFuture()); + when(txManager.cleanup(any(), anyBoolean(), any(), any())).thenReturn(nullCompletedFuture()); AtomicReference<Boolean> committed = interceptFinishTxCommand(); @@ -2476,7 +2476,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED, UUID.randomUUID().toString(), commitPartitionId, commitTs)); partitionReplicaListener.invoke( - TX_MESSAGES_FACTORY.txCleanupReplicaRequest() + TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest() .groupId(grpId) .txId(txId) .commit(true) diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index ec5112cd8f..e6e87bcccb 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.impl; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -94,11 +95,13 @@ import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.util.PendingIndependentComparableValuesTracker; +import org.apache.ignite.network.AbstractMessagingService; +import org.apache.ignite.network.ChannelType; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterNodeImpl; import org.apache.ignite.network.ClusterService; -import org.apache.ignite.network.MessagingService; import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.TopologyService; import org.apache.ignite.tx.TransactionException; import org.jetbrains.annotations.Nullable; @@ -425,7 +428,8 @@ public class DummyInternalTableImpl extends InternalTableImpl { when(topologyService.localMember()).thenReturn(LOCAL_NODE); ClusterService clusterService = mock(ClusterService.class); - when(clusterService.messagingService()).thenReturn(mock(MessagingService.class)); + + when(clusterService.messagingService()).thenReturn(new DummyMessagingService(LOCAL_NODE.name())); when(clusterService.topologyService()).thenReturn(topologyService); var txManager = new TxManagerImpl( @@ -485,4 +489,61 @@ public class DummyInternalTableImpl extends InternalTableImpl { } }; } + + /** + * Dummy messaging service for tests purposes. + * It does not provide any messaging functionality, but allows to trigger events. + */ + private static class DummyMessagingService extends AbstractMessagingService { + private final String localNodeName; + + private final AtomicLong correlationIdGenerator = new AtomicLong(); + + DummyMessagingService(String localNodeName) { + this.localNodeName = localNodeName; + } + + /** {@inheritDoc} */ + @Override + public void weakSend(ClusterNode recipient, NetworkMessage msg) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<Void> send(ClusterNode recipient, ChannelType channelType, NetworkMessage msg) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public CompletableFuture<Void> send(String recipientConsistentId, ChannelType channelType, NetworkMessage msg) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<Void> respond(ClusterNode recipient, ChannelType type, NetworkMessage msg, long correlationId) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<Void> respond(String recipientConsistentId, ChannelType type, NetworkMessage msg, long correlationId) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, ChannelType type, NetworkMessage msg, long timeout) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<NetworkMessage> invoke(String recipientNodeId, ChannelType type, NetworkMessage msg, long timeout) { + getMessageHandlers(msg.groupType()).forEach(h -> h.onReceived(msg, localNodeName, correlationIdGenerator.getAndIncrement())); + + return nullCompletedFuture(); + } + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java index fc6a98888f..9a6e287120 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.tx; +import java.util.Collection; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -129,21 +130,19 @@ public interface TxManager extends IgniteComponent { ); /** - * Sends cleanup request to the specified primary replica. + * Sends cleanup request to the cluster nodes that hosts primary replicas for the enlisted partitions. * - * @param primaryConsistentId A consistent id of the primary replica node. - * @param tablePartitionId Table partition id. - * @param txId Transaction id. - * @param commit {@code True} if a commit requested. + * @param partitions Enlisted partition groups. + * @param commit {@code true} if a commit requested. * @param commitTimestamp Commit timestamp ({@code null} if it's an abort). + * @param txId Transaction id. * @return Completable future of Void. */ CompletableFuture<Void> cleanup( - String primaryConsistentId, - TablePartitionId tablePartitionId, - UUID txId, + Collection<TablePartitionId> partitions, boolean commit, - @Nullable HybridTimestamp commitTimestamp + @Nullable HybridTimestamp commitTimestamp, + UUID txId ); /** diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java new file mode 100644 index 0000000000..0df5bc3495 --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.impl; + +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.util.ExceptionUtils.withCause; +import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.tx.TransactionException; + +/** + * A helper class to retrieve primary replicas with exception handling. + */ +public class PlacementDriverHelper { + /** The logger. */ + private static final IgniteLogger LOG = Loggers.forClass(PlacementDriverHelper.class); + + private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10; + + /** Placement driver. */ + private final PlacementDriver placementDriver; + + /** A hybrid logical clock. */ + private final HybridClock clock; + + /** + * Constructor. + * + * @param placementDriver Placement driver. + * @param clock A hybrid logical clock. + */ + public PlacementDriverHelper(PlacementDriver placementDriver, HybridClock clock) { + this.placementDriver = placementDriver; + this.clock = clock; + } + + /** + * Wait for primary replica to appear for the provided partition. + * + * @param partitionId Partition id. + * @return Future that completes with node id that is a primary for the provided partition, or completes with exception if no primary + * appeared during the await timeout. + */ + public CompletableFuture<ReplicaMeta> awaitPrimaryReplicaWithExceptionHandling(TablePartitionId partitionId) { + HybridTimestamp timestamp = clock.now(); + + return placementDriver.awaitPrimaryReplica(partitionId, timestamp, AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS) + .handle((primaryReplica, e) -> { + if (e != null) { + LOG.debug("Failed to retrieve primary replica for partition {}", partitionId, e); + + throw withCause(TransactionException::new, REPLICA_UNAVAILABLE_ERR, + "Failed to get the primary replica" + + " [tablePartitionId=" + partitionId + ", awaitTimestamp=" + timestamp + ']', e); + } + + return primaryReplica; + }); + } + + /** + * Get primary replicas for the provided partitions. + * + * @param partitions A collection of partitions. + * @return A future that completes with a map of node to the partitions the node is primary for and a collection of partitions that we + * failed to find the primary for. + */ + public CompletableFuture<PartitionData> findPrimaryReplicas(Collection<TablePartitionId> partitions) { + HybridTimestamp timestamp = clock.now(); + + Map<TablePartitionId, CompletableFuture<ReplicaMeta>> primaryReplicaFutures = new HashMap<>(); + + // Please note that we are using `get primary replica` instead of `await primary replica`. + // This method is faster, yet we still have the correctness: + // If the primary replica has not changed, get will return a valid value and we'll send an unlock request to this node. + // If the primary replica has expired and get returns null (or a different node), the primary node step down logic + // will automatically release the locks on that node. All we need to do is to clean the storage. + for (TablePartitionId partitionId : partitions) { + primaryReplicaFutures.put(partitionId, placementDriver.getPrimaryReplica(partitionId, timestamp)); + } + + return allOf(primaryReplicaFutures.values().toArray(new CompletableFuture<?>[0])) + .thenApply(v -> { + Map<String, Set<TablePartitionId>> partitionsByNode = new HashMap<>(); + + Set<TablePartitionId> partitionsWithoutPrimary = new HashSet<>(); + + for (Entry<TablePartitionId, CompletableFuture<ReplicaMeta>> entry : primaryReplicaFutures.entrySet()) { + // Safe to call join, the future has already finished. + ReplicaMeta meta = entry.getValue().join(); + + TablePartitionId partition = entry.getKey(); + + if (meta != null) { + partitionsByNode.computeIfAbsent(meta.getLeaseholder(), s -> new HashSet<>()) + .add(partition); + } else { + partitionsWithoutPrimary.add(partition); + } + } + return new PartitionData(partitionsByNode, partitionsWithoutPrimary); + }); + } + + /** + * The result of retrieving primary replicas for a collection of partitions. + */ + public static class PartitionData { + final Map<String, Set<TablePartitionId>> partitionsByNode; + + final Set<TablePartitionId> partitionsWithoutPrimary; + + PartitionData(Map<String, Set<TablePartitionId>> partitionsByNode, Set<TablePartitionId> partitionsWithoutPrimary) { + this.partitionsByNode = partitionsByNode; + this.partitionsWithoutPrimary = partitionsWithoutPrimary; + } + } +} diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java new file mode 100644 index 0000000000..6671e68603 --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.impl; + +import static java.util.concurrent.CompletableFuture.allOf; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.tx.LockManager; +import org.apache.ignite.internal.tx.message.TxCleanupMessage; +import org.apache.ignite.internal.tx.message.TxMessageGroup; +import org.apache.ignite.internal.tx.message.TxMessagesFactory; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.NetworkMessage; +import org.jetbrains.annotations.Nullable; + +/** + * Handles TX Cleanup request ({@link TxCleanupMessage}). + */ +public class TxCleanupRequestHandler { + /** Tx messages factory. */ + private static final TxMessagesFactory FACTORY = new TxMessagesFactory(); + + /** Cluster service. */ + private final ClusterService clusterService; + + /** Lock manager. */ + private final LockManager lockManager; + + /** Hybrid clock. */ + private final HybridClock hybridClock; + + /** Cleanup processor. */ + private final WriteIntentSwitchProcessor writeIntentSwitchProcessor; + + /** + * The constructor. + * + * @param clusterService Cluster service. + * @param lockManager Lock manager. + * @param clock A hybrid logical clock. + * @param writeIntentSwitchProcessor A cleanup processor. + */ + public TxCleanupRequestHandler( + ClusterService clusterService, + LockManager lockManager, + HybridClock clock, + WriteIntentSwitchProcessor writeIntentSwitchProcessor + ) { + this.clusterService = clusterService; + this.lockManager = lockManager; + this.hybridClock = clock; + this.writeIntentSwitchProcessor = writeIntentSwitchProcessor; + } + + /** + * Starts the processor. + */ + public void start() { + clusterService.messagingService().addMessageHandler(TxMessageGroup.class, (msg, sender, correlationId) -> { + if (msg instanceof TxCleanupMessage) { + processTxCleanup((TxCleanupMessage) msg, sender, correlationId); + } + }); + } + + public void stop() { + } + + private void processTxCleanup(TxCleanupMessage txCleanupMessage, String senderId, @Nullable Long correlationId) { + assert correlationId != null; + + Map<TablePartitionId, CompletableFuture<?>> writeIntentSwitches = new HashMap<>(); + + // These cleanups will all be local. + Collection<ReplicationGroupId> groups = txCleanupMessage.groups(); + + if (groups != null) { + for (ReplicationGroupId group : groups) { + writeIntentSwitches.put((TablePartitionId) group, + writeIntentSwitchProcessor.switchLocalWriteIntents( + (TablePartitionId) group, + txCleanupMessage.txId(), + txCleanupMessage.commit(), + txCleanupMessage.commitTimestamp() + )); + } + } + // First trigger the cleanup to properly release the locks if we know all affected partitions on this node. + // If the partition collection is empty (likely to be the recovery case)- just run 'release locks'. + allOf(writeIntentSwitches.values().toArray(new CompletableFuture<?>[0])) + .whenComplete((unused, ex) -> { + releaseTxLocks(txCleanupMessage.txId()); + + NetworkMessage msg; + if (ex == null) { + msg = prepareResponse(); + } else { + msg = prepareErrorResponse(ex); + + // Run durable cleanup for the partitions that we failed to cleanup properly. + // No need to wait on this future. + writeIntentSwitches.forEach((groupId, future) -> { + if (future.isCompletedExceptionally()) { + writeIntentSwitchProcessor.switchWriteIntentsWithRetry( + txCleanupMessage.commit(), + txCleanupMessage.commitTimestamp(), + txCleanupMessage.txId(), + groupId + ); + } + }); + } + + clusterService.messagingService().respond(senderId, msg, correlationId); + }); + } + + private void releaseTxLocks(UUID txId) { + lockManager.locks(txId).forEachRemaining(lockManager::release); + } + + private NetworkMessage prepareResponse() { + return FACTORY + .txCleanupMessageResponse() + .timestampLong(hybridClock.nowLong()) + .build(); + } + + private NetworkMessage prepareErrorResponse(Throwable th) { + return FACTORY + .txCleanupMessageErrorResponse() + .throwable(th) + .timestampLong(hybridClock.nowLong()) + .build(); + } +} diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java new file mode 100644 index 0000000000..46815c062f --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.impl; + +import static java.util.concurrent.CompletableFuture.allOf; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler; +import org.apache.ignite.internal.util.CompletableFutures; +import org.jetbrains.annotations.Nullable; + +/** + * Sends TX Cleanup request. + */ +public class TxCleanupRequestSender { + /** Placement driver helper. */ + private final PlacementDriverHelper placementDriverHelper; + + /** Cleanup processor. */ + private final WriteIntentSwitchProcessor writeIntentSwitchProcessor; + + private final TxMessageSender txMessageSender; + + /** + * The constructor. + * + * @param txMessageSender Message sender. + * @param placementDriverHelper Placement driver helper. + * @param writeIntentSwitchProcessor A cleanup processor. + */ + public TxCleanupRequestSender( + TxMessageSender txMessageSender, + PlacementDriverHelper placementDriverHelper, + WriteIntentSwitchProcessor writeIntentSwitchProcessor + ) { + this.txMessageSender = txMessageSender; + this.placementDriverHelper = placementDriverHelper; + this.writeIntentSwitchProcessor = writeIntentSwitchProcessor; + } + + /** + * Sends cleanup request to the primary nodes of each one of {@code partitions}. + * + * @param partitions Enlisted partition groups. + * @param commit {@code true} if a commit requested. + * @param commitTimestamp Commit timestamp ({@code null} if it's an abort). + * @param txId Transaction id. + * @return Completable future of Void. + */ + public CompletableFuture<Void> cleanup( + Collection<TablePartitionId> partitions, + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + UUID txId + ) { + return placementDriverHelper.findPrimaryReplicas(partitions) + .thenCompose(partitionData -> { + switchWriteIntentsOnPartitions(commit, commitTimestamp, txId, partitionData.partitionsWithoutPrimary); + + return cleanupPartitions(partitionData.partitionsByNode, commit, commitTimestamp, txId); + }); + } + + private void switchWriteIntentsOnPartitions( + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + UUID txId, + Set<TablePartitionId> noPrimaryFound + ) { + for (TablePartitionId partition : noPrimaryFound) { + // Okay, no primary found for that partition. + // Means the old one is no longer primary thus the locks were released. + // All we need to do is to wait for the new primary to appear and cleanup write intents. + writeIntentSwitchProcessor.switchWriteIntentsWithRetry(commit, commitTimestamp, txId, partition); + } + } + + private CompletableFuture<Void> cleanupPartitions( + Map<String, Set<TablePartitionId>> partitionsByNode, + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + UUID txId + ) { + List<CompletableFuture<Void>> cleanupFutures = new ArrayList<>(); + + for (Entry<String, Set<TablePartitionId>> entry : partitionsByNode.entrySet()) { + String node = entry.getKey(); + Set<TablePartitionId> nodePartitions = entry.getValue(); + + cleanupFutures.add(sendCleanupMessageWithRetries(commit, commitTimestamp, txId, node, nodePartitions)); + } + + return allOf(cleanupFutures.toArray(new CompletableFuture<?>[0])); + } + + private CompletableFuture<Void> sendCleanupMessageWithRetries( + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + UUID txId, + String node, + Collection<TablePartitionId> partitions + ) { + Collection<ReplicationGroupId> enlistedPartitions = (Collection<ReplicationGroupId>) (Collection<?>) partitions; + + return txMessageSender.cleanup(node, enlistedPartitions, txId, commit, commitTimestamp) + .handle((networkMessage, throwable) -> { + if (throwable != null) { + if (TransactionFailureHandler.isRecoverable(throwable)) { + // In the case of a failure we repeat the process, but start with finding correct primary replicas + // for this subset of partitions. If nothing changed in terms of the nodes and primaries + // we eventually will call ourselves with the same parameters. + // On the other hand (for example if this node has died) we will + // either have a new mapping of primary to its partitions + // or will run `switchWriteIntentsOnPartitions` for partitions with no primary. + // At the end of the day all write intents will be properly converted. + return cleanup(partitions, commit, commitTimestamp, txId); + } + + return CompletableFuture.<Void>failedFuture(throwable); + } + + return CompletableFutures.<Void>nullCompletedFuture(); + }) + .thenCompose(v -> v); + } +} diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index 97f0172826..507a7e4e41 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -20,19 +20,15 @@ package org.apache.ignite.internal.tx.impl; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.runAsync; import static java.util.concurrent.CompletableFuture.supplyAsync; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; import static org.apache.ignite.internal.tx.TxState.ABORTED; import static org.apache.ignite.internal.tx.TxState.COMMITED; import static org.apache.ignite.internal.tx.TxState.PENDING; import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness; import static org.apache.ignite.internal.tx.TxState.isFinalState; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; -import static org.apache.ignite.internal.util.ExceptionUtils.withCause; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; -import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_WAS_ABORTED_ERR; @@ -65,7 +61,6 @@ import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.placementdriver.PlacementDriver; -import org.apache.ignite.internal.placementdriver.ReplicaMeta; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; @@ -84,8 +79,6 @@ import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.TxStateMeta; import org.apache.ignite.internal.tx.TxStateMetaFinishing; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; -import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; -import org.apache.ignite.internal.tx.message.TxMessagesFactory; import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.internal.util.IgniteSpinBusyLock; @@ -107,16 +100,9 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { /** Hint for maximum concurrent txns. */ private static final int MAX_CONCURRENT_TXNS = 1024; - private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10; - - /** Tx messages factory. */ - private static final TxMessagesFactory FACTORY = new TxMessagesFactory(); - /** Transaction configuration. */ private final TransactionConfiguration txConfig; - private final ReplicaService replicaService; - /** Lock manager. */ private final LockManager lockManager; @@ -148,6 +134,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { private final PlacementDriver placementDriver; + private final PlacementDriverHelper placementDriverHelper; + private final LongSupplier idleSafeTimePropagationPeriodMsSupplier; /** Prevents double stopping of the tracker. */ @@ -165,6 +153,21 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { /** Local node network identity. This id is available only after the network has started. */ private String localNodeId; + /** + * Server cleanup processor. + */ + private final TxCleanupRequestHandler txCleanupRequestHandler; + + /** + * Cleanup request sender. + */ + private final TxCleanupRequestSender txCleanupRequestSender; + + /** + * Transaction message sender. + */ + private final TxMessageSender txMessageSender; + /** * The constructor. * @@ -188,7 +191,6 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { LongSupplier idleSafeTimePropagationPeriodMsSupplier ) { this.txConfig = txConfig; - this.replicaService = replicaService; this.lockManager = lockManager; this.clock = clock; this.transactionIdGenerator = transactionIdGenerator; @@ -196,6 +198,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { this.placementDriver = placementDriver; this.idleSafeTimePropagationPeriodMsSupplier = idleSafeTimePropagationPeriodMsSupplier; + placementDriverHelper = new PlacementDriverHelper(placementDriver, clock); + int cpus = Runtime.getRuntime().availableProcessors(); cleanupExecutor = new ThreadPoolExecutor( @@ -207,6 +211,15 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { new NamedThreadFactory("tx-async-cleanup", LOG)); orphanDetector = new OrphanDetector(clusterService.topologyService(), replicaService, placementDriver, lockManager, clock); + + txMessageSender = new TxMessageSender(clusterService, replicaService, clock); + + WriteIntentSwitchProcessor writeIntentSwitchProcessor = + new WriteIntentSwitchProcessor(placementDriverHelper, txMessageSender, clusterService); + + txCleanupRequestHandler = new TxCleanupRequestHandler(clusterService, lockManager, clock, writeIntentSwitchProcessor); + + txCleanupRequestSender = new TxCleanupRequestSender(txMessageSender, placementDriverHelper, writeIntentSwitchProcessor); } @Override @@ -281,7 +294,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { } @Override - public TxStateMeta updateTxMeta(UUID txId, Function<TxStateMeta, TxStateMeta> updater) { + public @Nullable TxStateMeta updateTxMeta(UUID txId, Function<TxStateMeta, TxStateMeta> updater) { return inBusyLock(busyLock, () -> txStateVolatileStorage.updateMeta(txId, oldMeta -> { TxStateMeta newMeta = updater.apply(oldMeta); @@ -410,7 +423,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { HybridTimestamp commitTimestamp, CompletableFuture<TransactionMeta> txFinishFuture ) { - return inBusyLockAsync(busyLock, () -> findPrimaryReplica(commitPartition, clock.now()) + return inBusyLockAsync(busyLock, () -> placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(commitPartition) .thenCompose(meta -> makeFinishRequest( observableTimestampTracker, @@ -481,18 +494,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { LOG.debug("Finish [partition={}, node={}, term={} commit={}, txId={}, groups={}", commitPartition, primaryConsistentId, term, commit, txId, replicationGroupIds); - TxFinishReplicaRequest req = FACTORY.txFinishReplicaRequest() - .txId(txId) - .timestampLong(clock.nowLong()) - .groupId(commitPartition) - .groups(replicationGroupIds) - // In case of verification future failure transaction will be rolled back. - .commit(commit) - .commitTimestampLong(hybridTimestampToLong(commitTimestamp)) - .enlistmentConsistencyToken(term) - .build(); - - return replicaService.invoke(primaryConsistentId, req) + return txMessageSender.finish(primaryConsistentId, commitPartition, replicationGroupIds, txId, term, commit, commitTimestamp) .thenRun(() -> { updateTxMeta(txId, old -> { if (isFinalState(old.txState())) { @@ -520,41 +522,6 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { }); } - private CompletableFuture<ReplicaMeta> findPrimaryReplica(TablePartitionId partitionId, HybridTimestamp now) { - return placementDriver.awaitPrimaryReplica(partitionId, now, AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS) - .handle((primaryReplica, e) -> { - if (e != null) { - LOG.error("Failed to retrieve primary replica for partition {}", e, partitionId); - - throw withCause(TransactionException::new, REPLICA_UNAVAILABLE_ERR, - "Failed to get the primary replica" - + " [tablePartitionId=" + partitionId + ", awaitTimestamp=" + now + ']', e); - } - - return primaryReplica; - }); - } - - @Override - public CompletableFuture<Void> cleanup( - String primaryConsistentId, - TablePartitionId tablePartitionId, - UUID txId, - boolean commit, - @Nullable HybridTimestamp commitTimestamp - ) { - return replicaService.invoke( - primaryConsistentId, - FACTORY.txCleanupReplicaRequest() - .groupId(tablePartitionId) - .timestampLong(clock.nowLong()) - .txId(txId) - .commit(commit) - .commitTimestampLong(hybridTimestampToLong(commitTimestamp)) - .build() - ); - } - @Override public int finished() { return inBusyLock(busyLock, () -> (int) txStateVolatileStorage.states().stream() @@ -577,6 +544,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { txStateVolatileStorage.start(); orphanDetector.start(txStateVolatileStorage, txConfig.abandonedCheckTs()); + + txCleanupRequestHandler.start(); } @Override @@ -586,13 +555,15 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { } @Override - public void stop() { + public void stop() throws Exception { if (!stopGuard.compareAndSet(false, true)) { return; } busyLock.block(); + txCleanupRequestHandler.stop(); + shutdownAndAwaitTermination(cleanupExecutor, 10, TimeUnit.SECONDS); } @@ -601,6 +572,16 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { return lockManager; } + @Override + public CompletableFuture<Void> cleanup( + Collection<TablePartitionId> partitions, + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + UUID txId + ) { + return txCleanupRequestSender.cleanup(partitions, commit, commitTimestamp, txId); + } + @Override public CompletableFuture<Void> executeCleanupAsync(Runnable runnable) { return runAsync(runnable, cleanupExecutor); @@ -808,7 +789,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { } } - private static class TransactionFailureHandler { + static class TransactionFailureHandler { private static final Set<Class<? extends Throwable>> RECOVERABLE = Set.of( TimeoutException.class, IOException.class, @@ -839,4 +820,3 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { } } } - diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java new file mode 100644 index 0000000000..3016c3dd96 --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.impl; + +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; + +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.tx.message.TxMessagesFactory; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.NetworkMessage; +import org.jetbrains.annotations.Nullable; + +/** + * This class is responsible for interacting with the messaging layer. Sends transaction messages. + */ +public class TxMessageSender { + + private static final long RPC_TIMEOUT = 3000; + + /** Tx messages factory. */ + private static final TxMessagesFactory FACTORY = new TxMessagesFactory(); + + /** Cluster service. */ + private final ClusterService clusterService; + + /** Replica service. */ + private final ReplicaService replicaService; + + /** A hybrid logical clock. */ + private final HybridClock clock; + + /** + * Constructor. + * + * @param clusterService Cluster service. + * @param replicaService Replica service. + * @param clock A hybrid logical clock. + */ + public TxMessageSender(ClusterService clusterService, ReplicaService replicaService, HybridClock clock) { + this.clusterService = clusterService; + this.replicaService = replicaService; + this.clock = clock; + } + + /** + * Sends WriteIntentSwitch request to the specified primary replica. + * + * @param primaryConsistentId Primary replica to process given cleanup request. + * @param tablePartitionId Table partition id. + * @param txId Transaction id. + * @param commit {@code True} if a commit requested. + * @param commitTimestamp Commit timestamp ({@code null} if it's an abort). + * @return Completable future of Void. + */ + public CompletableFuture<Void> switchWriteIntents( + String primaryConsistentId, + TablePartitionId tablePartitionId, + UUID txId, + boolean commit, + @Nullable HybridTimestamp commitTimestamp + ) { + return replicaService.invoke( + primaryConsistentId, + FACTORY.writeIntentSwitchReplicaRequest() + .groupId(tablePartitionId) + .timestampLong(clock.nowLong()) + .txId(txId) + .commit(commit) + .commitTimestampLong(hybridTimestampToLong(commitTimestamp)) + .build() + ); + } + + /** + * Sends cleanup request to the specified primary replica. + * + * @param primaryConsistentId Primary replica to process given cleanup request. + * @param replicationGroupIds Table partition ids. + * @param txId Transaction id. + * @param commit {@code True} if a commit requested. + * @param commitTimestamp Commit timestamp ({@code null} if it's an abort). + * @return Completable future of Void. + */ + public CompletableFuture<NetworkMessage> cleanup( + String primaryConsistentId, + Collection<ReplicationGroupId> replicationGroupIds, + UUID txId, + boolean commit, + @Nullable HybridTimestamp commitTimestamp + ) { + return clusterService.messagingService().invoke( + primaryConsistentId, + FACTORY.txCleanupMessage() + .txId(txId) + .commit(commit) + .commitTimestampLong(hybridTimestampToLong(commitTimestamp)) + .timestampLong(clock.nowLong()) + .groups(replicationGroupIds) + .build(), + RPC_TIMEOUT); + } + + /** + * Send a transactions finish request. + * + * @param primaryConsistentId Node id to send the request to. + * @param commitPartition Partition to store a transaction state. + * @param replicationGroupIds Enlisted partition groups. + * @param txId Transaction id. + * @param term Raft term. + * @param commit {@code true} if a commit requested. + * @param commitTimestamp Commit timestamp ({@code null} if it's an abort). + * @return Completable future of Void. + */ + public CompletableFuture<Void> finish( + String primaryConsistentId, + TablePartitionId commitPartition, + Collection<ReplicationGroupId> replicationGroupIds, + UUID txId, + Long term, + boolean commit, + @Nullable HybridTimestamp commitTimestamp + ) { + return replicaService.invoke( + primaryConsistentId, + FACTORY.txFinishReplicaRequest() + .txId(txId) + .timestampLong(clock.nowLong()) + .groupId(commitPartition) + .groups(replicationGroupIds) + .commit(commit) + .commitTimestampLong(hybridTimestampToLong(commitTimestamp)) + .enlistmentConsistencyToken(term) + .build()); + } +} diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java new file mode 100644 index 0000000000..eccd5d058d --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.impl; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.util.CompletableFutures; +import org.apache.ignite.network.ClusterService; +import org.jetbrains.annotations.Nullable; + +/** + * Sends requests to switch write intents (to normal value for a commit and remove for an abort). + */ +public class WriteIntentSwitchProcessor { + /** The logger. */ + private static final IgniteLogger LOG = Loggers.forClass(WriteIntentSwitchProcessor.class); + + private static final int ATTEMPTS_TO_SWITCH_WI = 5; + + /** Placement driver helper. */ + private final PlacementDriverHelper placementDriverHelper; + + private final TxMessageSender txMessageSender; + + /** Cluster service. */ + private final ClusterService clusterService; + + /** + * The constructor. + * + * @param placementDriverHelper Placement driver helper. + * @param txMessageSender Transaction message creator. + * @param clusterService Cluster service. + */ + public WriteIntentSwitchProcessor( + PlacementDriverHelper placementDriverHelper, + TxMessageSender txMessageSender, + ClusterService clusterService + ) { + this.placementDriverHelper = placementDriverHelper; + this.txMessageSender = txMessageSender; + this.clusterService = clusterService; + } + + /** + * Run switch write intent on the provided node. + */ + public CompletableFuture<Void> switchLocalWriteIntents( + TablePartitionId tablePartitionId, + UUID txId, + boolean commit, + @Nullable HybridTimestamp commitTimestamp + ) { + String localNodeName = clusterService.topologyService().localMember().name(); + + return txMessageSender.switchWriteIntents(localNodeName, tablePartitionId, txId, commit, commitTimestamp); + } + + /** + * Run switch write intent on the primary node of the provided partition in a durable manner. + */ + public CompletableFuture<Void> switchWriteIntentsWithRetry( + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + UUID txId, + TablePartitionId partitionId + ) { + return switchWriteIntentsWithRetry(commit, commitTimestamp, txId, partitionId, ATTEMPTS_TO_SWITCH_WI); + } + + // TODO https://issues.apache.org/jira/browse/IGNITE-20681 remove attempts count. + private CompletableFuture<Void> switchWriteIntentsWithRetry( + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + UUID txId, + TablePartitionId partitionId, + int attempts + ) { + return placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(partitionId) + .thenCompose(leaseHolder -> + txMessageSender.switchWriteIntents(leaseHolder.getLeaseholder(), partitionId, txId, commit, commitTimestamp)) + .handle((res, ex) -> { + if (ex != null) { + if (attempts > 0) { + LOG.warn("Failed to switch write intents for Tx. The operation will be retried [txId={}].", txId, ex); + } else { + LOG.warn("Failed to switch write intents for Tx [txId={}].", txId, ex); + } + + if (attempts > 0) { + return switchWriteIntentsWithRetry(commit, commitTimestamp, txId, partitionId, attempts - 1); + } + + return CompletableFuture.<Void>failedFuture(ex); + } + + return CompletableFutures.<Void>nullCompletedFuture(); + }) + .thenCompose(Function.identity()); + } +} diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java similarity index 73% copy from modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java copy to modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java index 2032cf1352..774b77ac6f 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java @@ -19,30 +19,37 @@ package org.apache.ignite.internal.tx.message; import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; +import java.util.Collection; import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.replicator.message.ReplicaRequest; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.message.TimestampAware; +import org.apache.ignite.network.annotations.Marshallable; import org.apache.ignite.network.annotations.Transferable; import org.jetbrains.annotations.Nullable; /** - * Transaction cleanup replica request that will trigger following actions processing. - * - * <ol> - * <li>Convert all pending entries(writeIntents) to either regular values(TxState.COMMITED) or remove them (TxState.ABORTED).</li> - * <li>Release all locks that were held on local replica by given transaction.</li> - * </ol> + * Cleanup transaction message. */ -@Transferable(TxMessageGroup.TX_CLEANUP_REQUEST) -public interface TxCleanupReplicaRequest extends ReplicaRequest, TimestampAware { +@Transferable(TxMessageGroup.TX_CLEANUP_MSG) +public interface TxCleanupMessage extends TimestampAware { /** - * Returns transaction Id. + * Gets a transaction id to resolve. * * @return Transaction id. */ UUID txId(); + /** + * Returns replication groups aggregated by expected primary replica nodes. + * Null when this message is sent at recovery. + * + * @return Replication groups aggregated by expected primary replica nodes. + */ + @Marshallable + @Nullable + Collection<ReplicationGroupId> groups(); + /** * Returns {@code True} if a commit request. * diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java new file mode 100644 index 0000000000..0e09ab6433 --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.message; + +import org.apache.ignite.network.annotations.Marshallable; +import org.apache.ignite.network.annotations.Transferable; + +/** + * Cleanup transaction message error response. + */ +@Transferable(TxMessageGroup.TX_CLEANUP_MSG_ERR_RESPONSE) +public interface TxCleanupMessageErrorResponse extends TxCleanupMessageResponse { + /** + * Returns a {@link Throwable} that was thrown during handling a lock release message. + * + * @return {@link Throwable} that was thrown during handling a lock release message. + */ + @Marshallable + Throwable throwable(); +} diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java new file mode 100644 index 0000000000..123391e0c7 --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.message; + +import org.apache.ignite.internal.replicator.message.TimestampAware; +import org.apache.ignite.network.annotations.Transferable; + +/** + * Cleanup transaction message response. + */ +@Transferable(TxMessageGroup.TX_CLEANUP_MSG_RESPONSE) +public interface TxCleanupMessageResponse extends TimestampAware { +} diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java index 2aed7d0c01..3e22292a2a 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java @@ -35,9 +35,9 @@ public class TxMessageGroup { public static final short TX_FINISH_RESPONSE = 1; /** - * Message type for {@link TxCleanupReplicaRequest}. + * Message type for {@link WriteIntentSwitchReplicaRequest}. */ - public static final short TX_CLEANUP_REQUEST = 2; + public static final short WRITE_INTENT_SWITCH_REQUEST = 2; /** * Message type for {@link TxStateCommitPartitionRequest}. @@ -58,4 +58,19 @@ public class TxMessageGroup { * Message type for {@link TxRecoveryMessage}. */ public static final short TX_RECOVERY_MSG = 6; + + /** + * Message type for {@link TxCleanupMessage}. + */ + public static final short TX_CLEANUP_MSG = 7; + + /** + * Message type for {@link TxCleanupMessageResponse}. + */ + public static final short TX_CLEANUP_MSG_RESPONSE = 8; + + /** + * Message type for {@link TxCleanupMessageErrorResponse}. + */ + public static final short TX_CLEANUP_MSG_ERR_RESPONSE = 9; } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicaRequest.java similarity index 80% rename from modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java rename to modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicaRequest.java index 2032cf1352..d4417255ff 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicaRequest.java @@ -27,15 +27,11 @@ import org.apache.ignite.network.annotations.Transferable; import org.jetbrains.annotations.Nullable; /** - * Transaction cleanup replica request that will trigger following actions processing. - * - * <ol> - * <li>Convert all pending entries(writeIntents) to either regular values(TxState.COMMITED) or remove them (TxState.ABORTED).</li> - * <li>Release all locks that were held on local replica by given transaction.</li> - * </ol> + * A replica request that either triggers the conversion of all pending entries(writeIntents) to regular values(TxState.COMMITED) + * or removes them (TxState.ABORTED). */ -@Transferable(TxMessageGroup.TX_CLEANUP_REQUEST) -public interface TxCleanupReplicaRequest extends ReplicaRequest, TimestampAware { +@Transferable(TxMessageGroup.WRITE_INTENT_SWITCH_REQUEST) +public interface WriteIntentSwitchReplicaRequest extends ReplicaRequest, TimestampAware { /** * Returns transaction Id. * diff --git a/modules/transactions/tech-notes/cleanup.puml b/modules/transactions/tech-notes/cleanup.puml new file mode 100644 index 0000000000..cf0f386c2f --- /dev/null +++ b/modules/transactions/tech-notes/cleanup.puml @@ -0,0 +1,51 @@ +@startuml + +!pragma teoz true +box "Commit partition" #LightGreen +participant PartitionReplicaListener +box "TxManager" +participant TxCleanupRequestSender +participant WriteIntentSwitchProcessor as WISP1 +end box +participant PlacementDriver + +end box + +box "Partition primary #i" #LightBlue +box "TxManager" +participant TxCleanupRequestHandler +participant WriteIntentSwitchProcessor as WISP2 +end box +participant PartitionReplicaListener as PRL2 +participant LockManager +end box + +box "New primary for a partition\n (old one stepped down)" #LightGray +participant PartitionReplicaListener as PRL3 +end box +rnote over PartitionReplicaListener + Finish transaction + (out of the scope) +endrnote +PartitionReplicaListener -> TxCleanupRequestSender: cleanup + +TxCleanupRequestSender -> PlacementDriver: For each enlisted partition\n do 'getPrimaryReplica' +return + +alt Executes for each available primary node + +TxCleanupRequestSender -> TxCleanupRequestHandler: send \ntxCleanupMessage +TxCleanupRequestHandler -> WISP2: switchWriteIntent +WISP2 -> PRL2: send \nwriteIntentSwitchReplicaRequest +return +WISP2 --> TxCleanupRequestHandler +TxCleanupRequestHandler->LockManager: releaseTxLocks +return +TxCleanupRequestHandler --> TxCleanupRequestSender +else no primary found +TxCleanupRequestSender -> WISP1: SwitchWriteIntentWithRetry +WISP1 -> PRL3: send \nwriteIntentSwitchReplicaRequest +end +TxCleanupRequestSender --> PartitionReplicaListener +PartitionReplicaListener -> PartitionReplicaListener : markLocksReleased +@enduml