This is an automated email from the ASF dual-hosted git repository. rpuch pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new a20d6eb81a0 Implement TxCleanupRecoveryRequest processing for zone replica (#5371) a20d6eb81a0 is described below commit a20d6eb81a0b7c5689cd25c843880b10b69e2eca Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Mon Mar 10 19:15:49 2025 +0400 Implement TxCleanupRecoveryRequest processing for zone replica (#5371) --- .../apache/ignite/client/fakes/FakeTxManager.java | 2 +- .../ItColocationTxCleanupRecoveryTest.java | 141 +++++++++++++++++++++ .../replicator/ItColocationTxRecoveryTest.java | 23 +--- .../partition/replicator/fixtures/Node.java | 36 ++++++ .../replicator/ZonePartitionReplicaListener.java | 11 +- .../handlers/TxCleanupRecoveryRequestHandler.java | 107 ++++++++++++++++ .../replicator/PartitionReplicaListener.java | 60 +-------- .../org/apache/ignite/internal/tx/TxManager.java | 3 +- .../internal/tx/impl/TransactionInflights.java | 5 +- .../ignite/internal/tx/impl/TxManagerImpl.java | 12 +- .../ignite/internal/tx/impl/TxMessageSender.java | 8 +- 11 files changed, 313 insertions(+), 95 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java index 6ef2392808d..faa6f1e305f 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java @@ -226,7 +226,7 @@ public class FakeTxManager implements TxManager { @Override public CompletableFuture<Void> cleanup( - TablePartitionId commitPartitionId, + ReplicationGroupId commitPartitionId, Collection<EnlistedPartitionGroup> enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxCleanupRecoveryTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxCleanupRecoveryTest.java new file mode 100644 index 00000000000..94b0a65e57b --- /dev/null +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxCleanupRecoveryTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.UUID; +import java.util.function.Predicate; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.IgniteBiTuple; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.PartitionTimestampCursor; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.tx.TxMeta; +import org.apache.ignite.internal.tx.message.TxCleanupMessage; +import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; +import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.table.KeyValueView; +import org.apache.ignite.tx.Transaction; +import org.junit.jupiter.api.Test; + +// TODO: remove after switching to per-zone partitions https://issues.apache.org/jira/browse/IGNITE-22522 +class ItColocationTxCleanupRecoveryTest extends ItAbstractColocationTest { + @Test + void txGetsCleanedUpOnPrimaryChange() throws Exception { + startCluster(3); + + Node node0 = cluster.get(0); + + // Create a zone with a single partition on every node. + int zoneId = createZone(node0, TEST_ZONE_NAME, 1, cluster.size()); + + createTable(node0, TEST_ZONE_NAME, TEST_TABLE_NAME1); + + cluster.forEach(Node::waitForMetadataCompletenessAtNow); + + ReplicaMeta primaryReplica = node0.getPrimaryReplica(zoneId); + Node primaryReplicaNode = cluster.stream() + .filter(node -> node.name.equals(primaryReplica.getLeaseholder())) + .findAny().orElseThrow(); + + // We'll use this node as transaction coordinator, so let's forbid it sending cleanup messages. + disallowTxCleanupMessagesFrom(primaryReplicaNode); + + KeyValueView<Long, Integer> kvView = node0.tableManager + .table(TEST_TABLE_NAME1) + .keyValueView(Long.class, Integer.class); + + Transaction transaction = primaryReplicaNode.transactions().begin(); + + kvView.put(transaction, 42L, 69); + waitTillOneWriteIntentAppearsOnAllNodes(TEST_TABLE_NAME1); + transaction.rollbackAsync(); + + waitTillTxStateAppearsOnAllNodes(zoneId); + + primaryReplicaNode.stop(); + cluster.remove(primaryReplicaNode); + + waitTillAllWriteIntentsGetRemovedOnAllNodes(TEST_TABLE_NAME1); + } + + private static void disallowTxCleanupMessagesFrom(Node primaryReplicaNode) { + primaryReplicaNode.dropMessages((destinationName, message) -> message instanceof TxCleanupMessage); + } + + private void waitTillOneWriteIntentAppearsOnAllNodes(String tableName) throws InterruptedException { + waitOnAllNodes("A write intent should appear on every node", tableName, storage -> { + List<ReadResult> readResults = readAll(storage); + return readResults.size() == 1 && readResults.stream().allMatch(ReadResult::isWriteIntent); + }); + } + + private void waitTillAllWriteIntentsGetRemovedOnAllNodes(String tableName) throws InterruptedException { + waitOnAllNodes("Write intents should be removed from all nodes", tableName, storage -> readAll(storage).isEmpty()); + } + + private void waitOnAllNodes(String expectation, String tableName, Predicate<MvPartitionStorage> storageTest) + throws InterruptedException { + for (Node node : cluster) { + InternalTable internalTable = unwrapTableImpl(node.tableManager.table(tableName)).internalTable(); + MvPartitionStorage storage = internalTable.storage().getMvPartition(0); + assertNotNull(storage); + + assertTrue( + waitForCondition(() -> storageTest.test(storage), SECONDS.toMillis(10)), + expectation + ); + } + } + + private static List<ReadResult> readAll(MvPartitionStorage storage) { + try (PartitionTimestampCursor cursor = storage.scan(HybridTimestamp.MAX_VALUE)) { + return cursor.stream().collect(toList()); + } + } + + private void waitTillTxStateAppearsOnAllNodes(int zoneId) throws InterruptedException { + for (Node node : cluster) { + TxStatePartitionStorage txStatePartitionStorage = node.txStatePartitionStorage(zoneId, 0); + + assertTrue( + waitForCondition(() -> !isEmpty(txStatePartitionStorage), SECONDS.toMillis(10)), + "Did not see any tx states on node " + node.name + " in time" + ); + } + } + + private static boolean isEmpty(TxStatePartitionStorage txStatePartitionStorage) { + return bypassingThreadAssertions(() -> { + try (Cursor<IgniteBiTuple<UUID, TxMeta>> cursor = txStatePartitionStorage.scan()) { + return !cursor.hasNext(); + } + }); + } +} diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxRecoveryTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxRecoveryTest.java index cee674d81ac..0cdfab104bc 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxRecoveryTest.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxRecoveryTest.java @@ -19,18 +19,15 @@ package org.apache.ignite.internal.partition.replicator; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.partition.replicator.fixtures.Node; import org.apache.ignite.internal.placementdriver.ReplicaMeta; -import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.table.KeyValueView; import org.apache.ignite.tx.Transaction; import org.junit.jupiter.api.Test; +// TODO: remove after switching to per-zone partitions https://issues.apache.org/jira/browse/IGNITE-22522 class ItColocationTxRecoveryTest extends ItAbstractColocationTest { private static final long KEY = 1; @@ -63,7 +60,7 @@ class ItColocationTxRecoveryTest extends ItAbstractColocationTest { putInitialValue(node0); - ReplicaMeta primaryReplica = getPrimaryReplica(zoneId); + ReplicaMeta primaryReplica = node0.getPrimaryReplica(zoneId); Node coordinatorNodeToBeStopped = findAnyOtherNode(primaryReplica); Transaction txToBeAbandoned = coordinatorNodeToBeStopped.transactions().begin(); @@ -90,22 +87,6 @@ class ItColocationTxRecoveryTest extends ItAbstractColocationTest { .put(null, KEY, 42); } - private ReplicaMeta getPrimaryReplica(int zoneId) { - Node node = cluster.get(0); - - CompletableFuture<ReplicaMeta> primaryReplicaFuture = node.placementDriverManager.placementDriver().getPrimaryReplica( - new ZonePartitionId(zoneId, 0), - node.hybridClock.now() - ); - - assertThat(primaryReplicaFuture, willCompleteSuccessfully()); - - ReplicaMeta replicaMeta = primaryReplicaFuture.join(); - assertThat(replicaMeta, is(notNullValue())); - - return replicaMeta; - } - private Node findAnyOtherNode(ReplicaMeta primaryReplica) { return cluster.stream() .filter(node -> !node.name.equals(primaryReplica.getLeaseholder())) diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java index a834bba868a..add3bfbfd1f 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java @@ -20,15 +20,20 @@ package org.apache.ignite.internal.partition.replicator.fixtures; import static java.util.Collections.reverse; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.internal.BaseIgniteRestartTest.createVault; import static org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE; import static org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsQueueKey; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn; import static org.apache.ignite.internal.util.IgniteUtils.stopAsync; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.internal.util.MockUtil.isMock; @@ -46,6 +51,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.BiPredicate; import java.util.function.LongSupplier; import org.apache.ignite.internal.app.ThreadPoolsManager; import org.apache.ignite.internal.catalog.CatalogManager; @@ -110,6 +116,8 @@ import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValue import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; import org.apache.ignite.internal.metrics.NoOpMetricManager; import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.network.DefaultMessagingService; +import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.NodeFinder; import org.apache.ignite.internal.network.configuration.NetworkExtensionConfigurationSchema; import org.apache.ignite.internal.network.recovery.InMemoryStaleIds; @@ -120,6 +128,7 @@ import org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycle import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager; import org.apache.ignite.internal.placementdriver.PlacementDriverManager; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; @@ -953,4 +962,31 @@ public class Node { public TxManager txManager() { return txManager; } + + public void dropMessages(BiPredicate<@Nullable String, NetworkMessage> predicate) { + ((DefaultMessagingService) clusterService.messagingService()).dropMessages(predicate); + } + + /** + * Returns the primary replica for given zone's partition 0. If there is no primary yet, waits for it. + * + * @param zoneId ID of the zone. + */ + public ReplicaMeta getPrimaryReplica(int zoneId) throws InterruptedException { + assertTrue(waitForCondition(() -> getNullablePrimaryReplica(zoneId) != null, SECONDS.toMillis(10))); + + ReplicaMeta primaryReplica = getNullablePrimaryReplica(zoneId); + assertThat(primaryReplica, is(notNullValue())); + return primaryReplica; + } + + private @Nullable ReplicaMeta getNullablePrimaryReplica(int zoneId) { + CompletableFuture<ReplicaMeta> primaryReplicaFuture = placementDriverManager.placementDriver().getPrimaryReplica( + new ZonePartitionId(zoneId, 0), + hybridClock.now() + ); + + assertThat(primaryReplicaFuture, willCompleteSuccessfully()); + return primaryReplicaFuture.join(); + } } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java index 39826070580..4a507440f06 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.network.ClusterNodeResolver; import org.apache.ignite.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler; import org.apache.ignite.internal.partition.replicator.handlers.ReplicaSafeTimeSyncRequestHandler; +import org.apache.ignite.internal.partition.replicator.handlers.TxCleanupRecoveryRequestHandler; import org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler; import org.apache.ignite.internal.partition.replicator.handlers.TxRecoveryMessageHandler; import org.apache.ignite.internal.partition.replicator.handlers.TxStateCommitPartitionReplicaRequestHandler; @@ -49,6 +50,7 @@ import org.apache.ignite.internal.replicator.message.TableAware; import org.apache.ignite.internal.schema.SchemaSyncService; import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment; import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.message.TxCleanupRecoveryRequest; import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; import org.apache.ignite.internal.tx.message.TxRecoveryMessage; import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest; @@ -75,13 +77,12 @@ public class ZonePartitionReplicaListener implements ReplicaListener { private final ReplicaPrimacyEngine replicaPrimacyEngine; - private final ReplicationRaftCommandApplicator raftCommandApplicator; - // Replica request handlers. private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler; private final WriteIntentSwitchRequestHandler writeIntentSwitchRequestHandler; private final TxStateCommitPartitionReplicaRequestHandler txStateCommitPartitionReplicaRequestHandler; private final TxRecoveryMessageHandler txRecoveryMessageHandler; + private final TxCleanupRecoveryRequestHandler txCleanupRecoveryRequestHandler; private final MinimumActiveTxTimeReplicaRequestHandler minimumActiveTxTimeReplicaRequestHandler; private final VacuumTxStateReplicaRequestHandler vacuumTxStateReplicaRequestHandler; private final ReplicaSafeTimeSyncRequestHandler replicaSafeTimeSyncRequestHandler; @@ -117,7 +118,7 @@ public class ZonePartitionReplicaListener implements ReplicaListener { localNode ); - this.raftCommandApplicator = new ReplicationRaftCommandApplicator(raftClient, replicationGroupId); + ReplicationRaftCommandApplicator raftCommandApplicator = new ReplicationRaftCommandApplicator(raftClient, replicationGroupId); TxRecoveryEngine txRecoveryEngine = new TxRecoveryEngine( txManager, @@ -158,6 +159,8 @@ public class ZonePartitionReplicaListener implements ReplicaListener { txRecoveryMessageHandler = new TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId, txRecoveryEngine); + txCleanupRecoveryRequestHandler = new TxCleanupRecoveryRequestHandler(txStatePartitionStorage, txManager, replicationGroupId); + minimumActiveTxTimeReplicaRequestHandler = new MinimumActiveTxTimeReplicaRequestHandler( clockService, raftCommandApplicator @@ -207,6 +210,8 @@ public class ZonePartitionReplicaListener implements ReplicaListener { return txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest) request); } else if (request instanceof TxRecoveryMessage) { return txRecoveryMessageHandler.handle((TxRecoveryMessage) request, senderId); + } else if (request instanceof TxCleanupRecoveryRequest) { + return txCleanupRecoveryRequestHandler.handle((TxCleanupRecoveryRequest) request); } return processZoneReplicaRequest(request, replicaPrimacy, senderId); diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java new file mode 100644 index 00000000000..43cd011d89f --- /dev/null +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.handlers; + +import static org.apache.ignite.internal.tx.TxState.COMMITTED; +import static org.apache.ignite.internal.tx.TxState.isFinalState; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.lang.IgniteBiTuple; +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.TxMeta; +import org.apache.ignite.internal.tx.message.TxCleanupRecoveryRequest; +import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; +import org.apache.ignite.internal.util.Cursor; + +/** + * Handler for {@link TxCleanupRecoveryRequest}s. + */ +public class TxCleanupRecoveryRequestHandler { + private static final IgniteLogger LOG = Loggers.forClass(TxCleanupRecoveryRequestHandler.class); + + private final TxStatePartitionStorage txStatePartitionStorage; + private final TxManager txManager; + private final ReplicationGroupId replicationGroupId; + + /** Constructor. */ + public TxCleanupRecoveryRequestHandler( + TxStatePartitionStorage txStatePartitionStorage, + TxManager txManager, + ReplicationGroupId replicationGroupId + ) { + this.txStatePartitionStorage = txStatePartitionStorage; + this.txManager = txManager; + this.replicationGroupId = replicationGroupId; + } + + /** + * Handles a {@link TxCleanupRecoveryRequest}. + * + * @param request Request to handle. + * @return Future completed when the request has been handled. + */ + public CompletableFuture<Void> handle(TxCleanupRecoveryRequest request) { + runPersistentStorageScan(); + + return nullCompletedFuture(); + } + + private void runPersistentStorageScan() { + int committedCount = 0; + int abortedCount = 0; + + try (Cursor<IgniteBiTuple<UUID, TxMeta>> txs = txStatePartitionStorage.scan()) { + for (IgniteBiTuple<UUID, TxMeta> tx : txs) { + UUID txId = tx.getKey(); + TxMeta txMeta = tx.getValue(); + + assert !txMeta.enlistedPartitions().isEmpty(); + + assert isFinalState(txMeta.txState()) : "Unexpected state [txId=" + txId + ", state=" + txMeta.txState() + "]."; + + if (txMeta.txState() == COMMITTED) { + committedCount++; + } else { + abortedCount++; + } + + txManager.cleanup( + replicationGroupId, + txMeta.enlistedPartitions(), + txMeta.txState() == COMMITTED, + txMeta.commitTimestamp(), + txId + ).exceptionally(throwable -> { + LOG.warn("Failed to cleanup transaction [txId={}].", throwable, txId); + + return null; + }); + } + } catch (IgniteInternalException e) { + LOG.warn("Failed to scan transaction state storage [commitPartition={}].", e, replicationGroupId); + } + + LOG.debug("Persistent storage scan finished [committed={}, aborted={}].", committedCount, abortedCount); + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 869b6323462..f8f474d9f01 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -111,6 +111,7 @@ import org.apache.ignite.internal.partition.replicator.ReplicaTxFinishMarker; import org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator; import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine; import org.apache.ignite.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler; +import org.apache.ignite.internal.partition.replicator.handlers.TxCleanupRecoveryRequestHandler; import org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler; import org.apache.ignite.internal.partition.replicator.handlers.TxRecoveryMessageHandler; import org.apache.ignite.internal.partition.replicator.handlers.TxStateCommitPartitionReplicaRequestHandler; @@ -206,7 +207,6 @@ import org.apache.ignite.internal.tx.LockMode; import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment; import org.apache.ignite.internal.tx.TransactionMeta; import org.apache.ignite.internal.tx.TxManager; -import org.apache.ignite.internal.tx.TxMeta; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.TxStateMeta; import org.apache.ignite.internal.tx.UpdateCommandResult; @@ -305,9 +305,6 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr /** Resources registry. */ private final RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry; - /** Tx state storage. */ - private final TxStatePartitionStorage txStatePartitionStorage; - /** Clock service. */ private final ClockService clockService; @@ -362,6 +359,7 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler; private final TxStateCommitPartitionReplicaRequestHandler txStateCommitPartitionReplicaRequestHandler; private final TxRecoveryMessageHandler txRecoveryMessageHandler; + private final TxCleanupRecoveryRequestHandler txCleanupRecoveryRequestHandler; private final MinimumActiveTxTimeReplicaRequestHandler minimumActiveTxTimeReplicaRequestHandler; private final VacuumTxStateReplicaRequestHandler vacuumTxStateReplicaRequestHandler; private final BuildIndexReplicaRequestHandler buildIndexReplicaRequestHandler; @@ -427,7 +425,6 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr this.secondaryIndexStorages = secondaryIndexStorages; this.clockService = clockService; this.safeTime = safeTime; - this.txStatePartitionStorage = txStatePartitionStorage; this.transactionStateResolver = transactionStateResolver; this.storageUpdateHandler = storageUpdateHandler; this.schemaSyncService = schemaSyncService; @@ -474,6 +471,8 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr txRecoveryMessageHandler = new TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId, txRecoveryEngine); + txCleanupRecoveryRequestHandler = new TxCleanupRecoveryRequestHandler(txStatePartitionStorage, txManager, replicationGroupId); + minimumActiveTxTimeReplicaRequestHandler = new MinimumActiveTxTimeReplicaRequestHandler( clockService, raftCommandApplicator); @@ -499,47 +498,6 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr return new PendingTxPartitionEnlistment(node.name(), 0L, ((TablePartitionId) replicationGroupId).tableId()); } - private void runPersistentStorageScan() { - int committedCount = 0; - int abortedCount = 0; - - try (Cursor<IgniteBiTuple<UUID, TxMeta>> txs = txStatePartitionStorage.scan()) { - for (IgniteBiTuple<UUID, TxMeta> tx : txs) { - UUID txId = tx.getKey(); - TxMeta txMeta = tx.getValue(); - - assert !txMeta.enlistedPartitions().isEmpty(); - - assert isFinalState(txMeta.txState()) : "Unexpected state [txId=" + txId + ", state=" + txMeta.txState() + "]."; - - if (txMeta.txState() == COMMITTED) { - committedCount++; - } else { - abortedCount++; - } - - assert !enabledColocation() : "Unexpected method call within colocation enabled."; - - txManager.cleanup( - // This method is not called in a colocation context, thus it's valid to cast replicationGroupId to TablePartitionId - (TablePartitionId) replicationGroupId, - txMeta.enlistedPartitions(), - txMeta.txState() == COMMITTED, - txMeta.commitTimestamp(), - txId - ).exceptionally(throwable -> { - LOG.warn("Failed to cleanup transaction [txId={}].", throwable, txId); - - return null; - }); - } - } catch (IgniteInternalException e) { - LOG.warn("Failed to scan transaction state storage [commitPartition={}].", e, replicationGroupId); - } - - LOG.debug("Persistent storage scan finished [committed={}, aborted={}].", committedCount, abortedCount); - } - @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) { return replicaPrimacyEngine.validatePrimacy(request) @@ -609,7 +567,9 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr } if (request instanceof TxCleanupRecoveryRequest) { - return processCleanupRecoveryMessage((TxCleanupRecoveryRequest) request); + assert !enabledColocation() : "Unexpected method call within colocation enabled."; + + return txCleanupRecoveryRequestHandler.handle((TxCleanupRecoveryRequest) request); } if (request instanceof GetEstimatedSizeRequest) { @@ -661,12 +621,6 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr return completedFuture(mvDataStorage.estimatedSize()); } - private CompletableFuture<Void> processCleanupRecoveryMessage(TxCleanupRecoveryRequest request) { - runPersistentStorageScan(); - - return nullCompletedFuture(); - } - private CompletableFuture<Void> processChangePeersAndLearnersReplicaRequest(ChangePeersAndLearnersAsyncReplicaRequest request) { TablePartitionId replicaGrpId = (TablePartitionId) request.groupId().asReplicationGroupId(); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java index e0a3f29c8e4..c1976c54d3e 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java @@ -26,7 +26,6 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.hlc.HybridTimestampTracker; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -200,7 +199,7 @@ public interface TxManager extends IgniteComponent { * @return Completable future of Void. */ CompletableFuture<Void> cleanup( - TablePartitionId commitPartitionId, + ReplicationGroupId commitPartitionId, Collection<EnlistedPartitionGroup> enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java index e80bebec19a..cce75e9926f 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java @@ -37,7 +37,6 @@ import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException; import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment; import org.apache.ignite.internal.tx.TransactionResult; @@ -122,7 +121,7 @@ public class TransactionInflights { txCtxMap.keySet().removeAll(txIds); } - void cancelWaitingInflights(TablePartitionId groupId) { + void cancelWaitingInflights(ReplicationGroupId groupId) { for (Map.Entry<UUID, TxContext> ctxEntry : txCtxMap.entrySet()) { if (ctxEntry.getValue() instanceof ReadWriteTxContext) { ReadWriteTxContext txContext = (ReadWriteTxContext) ctxEntry.getValue(); @@ -314,7 +313,7 @@ public class TransactionInflights { return waitRepFut; } - void cancelWaitingInflights(TablePartitionId groupId, long enlistmentConsistencyToken) { + void cancelWaitingInflights(ReplicationGroupId groupId, long enlistmentConsistencyToken) { waitRepFut.completeExceptionally(new PrimaryReplicaExpiredException(groupId, enlistmentConsistencyToken, null, null)); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index e401a21dce1..9d62e0a6677 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -358,21 +358,19 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler, SystemVi new TxCleanupRequestSender(txMessageSender, placementDriverHelper, txStateVolatileStorage); } - // TODO: IGNITE-24363 - change action argument type to ReplicaGroupId. private CompletableFuture<Boolean> primaryReplicaEventListener( PrimaryReplicaEventParameters eventParameters, - Consumer<TablePartitionId> action + Consumer<ReplicationGroupId> action ) { return inBusyLock(busyLock, () -> { assertReplicationGroupType(eventParameters.groupId()); - if (!(eventParameters.groupId() instanceof TablePartitionId)) { + // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 - remove check for TablePartitionId. + if (!(eventParameters.groupId() instanceof TablePartitionId) && !(eventParameters.groupId() instanceof ZonePartitionId)) { return falseCompletedFuture(); } - TablePartitionId groupId = (TablePartitionId) eventParameters.groupId(); - - action.accept(groupId); + action.accept(eventParameters.groupId()); return falseCompletedFuture(); }); @@ -973,7 +971,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler, SystemVi @Override public CompletableFuture<Void> cleanup( - TablePartitionId commitPartitionId, + ReplicationGroupId commitPartitionId, Collection<EnlistedPartitionGroup> enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java index 100d38a8ff6..49968b68e88 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.tx.impl; import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage; -import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage; import java.util.ArrayList; import java.util.Collection; @@ -33,7 +32,6 @@ import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; import org.apache.ignite.internal.replicator.message.ReplicaResponse; import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; @@ -240,14 +238,14 @@ public class TxMessageSender { * Send TxCleanupRecoveryRequest. * * @param primaryConsistentId Node id to send the request to. - * @param tablePartitionId Table partition id. + * @param replicationGroupId Replication group ID corresponding to a partition. * @return Completable future of ReplicaResponse. */ - public CompletableFuture<ReplicaResponse> sendRecoveryCleanup(String primaryConsistentId, TablePartitionId tablePartitionId) { + public CompletableFuture<ReplicaResponse> sendRecoveryCleanup(String primaryConsistentId, ReplicationGroupId replicationGroupId) { return replicaService.invoke( primaryConsistentId, TX_MESSAGES_FACTORY.txCleanupRecoveryRequest() - .groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartitionId)) + .groupId(toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, replicationGroupId)) .build() ); }