This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a20d6eb81a0 Implement TxCleanupRecoveryRequest processing for zone 
replica (#5371)
a20d6eb81a0 is described below

commit a20d6eb81a0b7c5689cd25c843880b10b69e2eca
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Mon Mar 10 19:15:49 2025 +0400

    Implement TxCleanupRecoveryRequest processing for zone replica (#5371)
---
 .../apache/ignite/client/fakes/FakeTxManager.java  |   2 +-
 .../ItColocationTxCleanupRecoveryTest.java         | 141 +++++++++++++++++++++
 .../replicator/ItColocationTxRecoveryTest.java     |  23 +---
 .../partition/replicator/fixtures/Node.java        |  36 ++++++
 .../replicator/ZonePartitionReplicaListener.java   |  11 +-
 .../handlers/TxCleanupRecoveryRequestHandler.java  | 107 ++++++++++++++++
 .../replicator/PartitionReplicaListener.java       |  60 +--------
 .../org/apache/ignite/internal/tx/TxManager.java   |   3 +-
 .../internal/tx/impl/TransactionInflights.java     |   5 +-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  12 +-
 .../ignite/internal/tx/impl/TxMessageSender.java   |   8 +-
 11 files changed, 313 insertions(+), 95 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 6ef2392808d..faa6f1e305f 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -226,7 +226,7 @@ public class FakeTxManager implements TxManager {
 
     @Override
     public CompletableFuture<Void> cleanup(
-            TablePartitionId commitPartitionId,
+            ReplicationGroupId commitPartitionId,
             Collection<EnlistedPartitionGroup> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxCleanupRecoveryTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxCleanupRecoveryTest.java
new file mode 100644
index 00000000000..94b0a65e57b
--- /dev/null
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxCleanupRecoveryTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxCleanupMessage;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.Test;
+
+// TODO: remove after switching to per-zone partitions 
https://issues.apache.org/jira/browse/IGNITE-22522
+class ItColocationTxCleanupRecoveryTest extends ItAbstractColocationTest {
+    @Test
+    void txGetsCleanedUpOnPrimaryChange() throws Exception {
+        startCluster(3);
+
+        Node node0 = cluster.get(0);
+
+        // Create a zone with a single partition on every node.
+        int zoneId = createZone(node0, TEST_ZONE_NAME, 1, cluster.size());
+
+        createTable(node0, TEST_ZONE_NAME, TEST_TABLE_NAME1);
+
+        cluster.forEach(Node::waitForMetadataCompletenessAtNow);
+
+        ReplicaMeta primaryReplica = node0.getPrimaryReplica(zoneId);
+        Node primaryReplicaNode = cluster.stream()
+                .filter(node -> 
node.name.equals(primaryReplica.getLeaseholder()))
+                .findAny().orElseThrow();
+
+        // We'll use this node as transaction coordinator, so let's forbid it 
sending cleanup messages.
+        disallowTxCleanupMessagesFrom(primaryReplicaNode);
+
+        KeyValueView<Long, Integer> kvView = node0.tableManager
+                .table(TEST_TABLE_NAME1)
+                .keyValueView(Long.class, Integer.class);
+
+        Transaction transaction = primaryReplicaNode.transactions().begin();
+
+        kvView.put(transaction, 42L, 69);
+        waitTillOneWriteIntentAppearsOnAllNodes(TEST_TABLE_NAME1);
+        transaction.rollbackAsync();
+
+        waitTillTxStateAppearsOnAllNodes(zoneId);
+
+        primaryReplicaNode.stop();
+        cluster.remove(primaryReplicaNode);
+
+        waitTillAllWriteIntentsGetRemovedOnAllNodes(TEST_TABLE_NAME1);
+    }
+
+    private static void disallowTxCleanupMessagesFrom(Node primaryReplicaNode) 
{
+        primaryReplicaNode.dropMessages((destinationName, message) -> message 
instanceof TxCleanupMessage);
+    }
+
+    private void waitTillOneWriteIntentAppearsOnAllNodes(String tableName) 
throws InterruptedException {
+        waitOnAllNodes("A write intent should appear on every node", 
tableName, storage -> {
+            List<ReadResult> readResults = readAll(storage);
+            return readResults.size() == 1 && 
readResults.stream().allMatch(ReadResult::isWriteIntent);
+        });
+    }
+
+    private void waitTillAllWriteIntentsGetRemovedOnAllNodes(String tableName) 
throws InterruptedException {
+        waitOnAllNodes("Write intents should be removed from all nodes", 
tableName, storage -> readAll(storage).isEmpty());
+    }
+
+    private void waitOnAllNodes(String expectation, String tableName, 
Predicate<MvPartitionStorage> storageTest)
+            throws InterruptedException {
+        for (Node node : cluster) {
+            InternalTable internalTable = 
unwrapTableImpl(node.tableManager.table(tableName)).internalTable();
+            MvPartitionStorage storage = 
internalTable.storage().getMvPartition(0);
+            assertNotNull(storage);
+
+            assertTrue(
+                    waitForCondition(() -> storageTest.test(storage), 
SECONDS.toMillis(10)),
+                    expectation
+            );
+        }
+    }
+
+    private static List<ReadResult> readAll(MvPartitionStorage storage) {
+        try (PartitionTimestampCursor cursor = 
storage.scan(HybridTimestamp.MAX_VALUE)) {
+            return cursor.stream().collect(toList());
+        }
+    }
+
+    private void waitTillTxStateAppearsOnAllNodes(int zoneId) throws 
InterruptedException {
+        for (Node node : cluster) {
+            TxStatePartitionStorage txStatePartitionStorage = 
node.txStatePartitionStorage(zoneId, 0);
+
+            assertTrue(
+                    waitForCondition(() -> !isEmpty(txStatePartitionStorage), 
SECONDS.toMillis(10)),
+                    "Did not see any tx states on node " + node.name + " in 
time"
+            );
+        }
+    }
+
+    private static boolean isEmpty(TxStatePartitionStorage 
txStatePartitionStorage) {
+        return bypassingThreadAssertions(() -> {
+            try (Cursor<IgniteBiTuple<UUID, TxMeta>> cursor = 
txStatePartitionStorage.scan()) {
+                return !cursor.hasNext();
+            }
+        });
+    }
+}
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxRecoveryTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxRecoveryTest.java
index cee674d81ac..0cdfab104bc 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxRecoveryTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxRecoveryTest.java
@@ -19,18 +19,15 @@ package org.apache.ignite.internal.partition.replicator;
 
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.partition.replicator.fixtures.Node;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
-import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.tx.Transaction;
 import org.junit.jupiter.api.Test;
 
+// TODO: remove after switching to per-zone partitions 
https://issues.apache.org/jira/browse/IGNITE-22522
 class ItColocationTxRecoveryTest extends ItAbstractColocationTest {
     private static final long KEY = 1;
 
@@ -63,7 +60,7 @@ class ItColocationTxRecoveryTest extends 
ItAbstractColocationTest {
 
         putInitialValue(node0);
 
-        ReplicaMeta primaryReplica = getPrimaryReplica(zoneId);
+        ReplicaMeta primaryReplica = node0.getPrimaryReplica(zoneId);
 
         Node coordinatorNodeToBeStopped = findAnyOtherNode(primaryReplica);
         Transaction txToBeAbandoned = 
coordinatorNodeToBeStopped.transactions().begin();
@@ -90,22 +87,6 @@ class ItColocationTxRecoveryTest extends 
ItAbstractColocationTest {
                 .put(null, KEY, 42);
     }
 
-    private ReplicaMeta getPrimaryReplica(int zoneId) {
-        Node node = cluster.get(0);
-
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
node.placementDriverManager.placementDriver().getPrimaryReplica(
-                new ZonePartitionId(zoneId, 0),
-                node.hybridClock.now()
-        );
-
-        assertThat(primaryReplicaFuture, willCompleteSuccessfully());
-
-        ReplicaMeta replicaMeta = primaryReplicaFuture.join();
-        assertThat(replicaMeta, is(notNullValue()));
-
-        return replicaMeta;
-    }
-
     private Node findAnyOtherNode(ReplicaMeta primaryReplica) {
         return cluster.stream()
                 .filter(node -> 
!node.name.equals(primaryReplica.getLeaseholder()))
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index a834bba868a..add3bfbfd1f 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -20,15 +20,20 @@ package 
org.apache.ignite.internal.partition.replicator.fixtures;
 import static java.util.Collections.reverse;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.BaseIgniteRestartTest.createVault;
 import static 
org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsQueueKey;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.internal.util.MockUtil.isMock;
@@ -46,6 +51,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
 import java.util.function.LongSupplier;
 import org.apache.ignite.internal.app.ThreadPoolsManager;
 import org.apache.ignite.internal.catalog.CatalogManager;
@@ -110,6 +116,8 @@ import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValue
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
 import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.DefaultMessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NodeFinder;
 import 
org.apache.ignite.internal.network.configuration.NetworkExtensionConfigurationSchema;
 import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
@@ -120,6 +128,7 @@ import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycle
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
@@ -953,4 +962,31 @@ public class Node {
     public TxManager txManager() {
         return txManager;
     }
+
+    public void dropMessages(BiPredicate<@Nullable String, NetworkMessage> 
predicate) {
+        ((DefaultMessagingService) 
clusterService.messagingService()).dropMessages(predicate);
+    }
+
+    /**
+     * Returns the primary replica for given zone's partition 0. If there is 
no primary yet, waits for it.
+     *
+     * @param zoneId ID of the zone.
+     */
+    public ReplicaMeta getPrimaryReplica(int zoneId) throws 
InterruptedException {
+        assertTrue(waitForCondition(() -> getNullablePrimaryReplica(zoneId) != 
null, SECONDS.toMillis(10)));
+
+        ReplicaMeta primaryReplica = getNullablePrimaryReplica(zoneId);
+        assertThat(primaryReplica, is(notNullValue()));
+        return primaryReplica;
+    }
+
+    private @Nullable ReplicaMeta getNullablePrimaryReplica(int zoneId) {
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriverManager.placementDriver().getPrimaryReplica(
+                new ZonePartitionId(zoneId, 0),
+                hybridClock.now()
+        );
+
+        assertThat(primaryReplicaFuture, willCompleteSuccessfully());
+        return primaryReplicaFuture.join();
+    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index 39826070580..4a507440f06 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
 import 
org.apache.ignite.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.ReplicaSafeTimeSyncRequestHandler;
+import 
org.apache.ignite.internal.partition.replicator.handlers.TxCleanupRecoveryRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.TxRecoveryMessageHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.TxStateCommitPartitionReplicaRequestHandler;
@@ -49,6 +50,7 @@ import 
org.apache.ignite.internal.replicator.message.TableAware;
 import org.apache.ignite.internal.schema.SchemaSyncService;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.message.TxCleanupRecoveryRequest;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
 import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
@@ -75,13 +77,12 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
 
     private final ReplicaPrimacyEngine replicaPrimacyEngine;
 
-    private final ReplicationRaftCommandApplicator raftCommandApplicator;
-
     // Replica request handlers.
     private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler;
     private final WriteIntentSwitchRequestHandler 
writeIntentSwitchRequestHandler;
     private final TxStateCommitPartitionReplicaRequestHandler 
txStateCommitPartitionReplicaRequestHandler;
     private final TxRecoveryMessageHandler txRecoveryMessageHandler;
+    private final TxCleanupRecoveryRequestHandler 
txCleanupRecoveryRequestHandler;
     private final MinimumActiveTxTimeReplicaRequestHandler 
minimumActiveTxTimeReplicaRequestHandler;
     private final VacuumTxStateReplicaRequestHandler 
vacuumTxStateReplicaRequestHandler;
     private final ReplicaSafeTimeSyncRequestHandler 
replicaSafeTimeSyncRequestHandler;
@@ -117,7 +118,7 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
                 localNode
         );
 
-        this.raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftClient, replicationGroupId);
+        ReplicationRaftCommandApplicator raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftClient, replicationGroupId);
 
         TxRecoveryEngine txRecoveryEngine = new TxRecoveryEngine(
                 txManager,
@@ -158,6 +159,8 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
 
         txRecoveryMessageHandler = new 
TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId, 
txRecoveryEngine);
 
+        txCleanupRecoveryRequestHandler = new 
TxCleanupRecoveryRequestHandler(txStatePartitionStorage, txManager, 
replicationGroupId);
+
         minimumActiveTxTimeReplicaRequestHandler = new 
MinimumActiveTxTimeReplicaRequestHandler(
                 clockService,
                 raftCommandApplicator
@@ -207,6 +210,8 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
             return 
txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest)
 request);
         } else if (request instanceof TxRecoveryMessage) {
             return txRecoveryMessageHandler.handle((TxRecoveryMessage) 
request, senderId);
+        } else if (request instanceof TxCleanupRecoveryRequest) {
+            return 
txCleanupRecoveryRequestHandler.handle((TxCleanupRecoveryRequest) request);
         }
 
         return processZoneReplicaRequest(request, replicaPrimacy, senderId);
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java
new file mode 100644
index 00000000000..43cd011d89f
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.handlers;
+
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxCleanupRecoveryRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Handler for {@link TxCleanupRecoveryRequest}s.
+ */
+public class TxCleanupRecoveryRequestHandler {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxCleanupRecoveryRequestHandler.class);
+
+    private final TxStatePartitionStorage txStatePartitionStorage;
+    private final TxManager txManager;
+    private final ReplicationGroupId replicationGroupId;
+
+    /** Constructor. */
+    public TxCleanupRecoveryRequestHandler(
+            TxStatePartitionStorage txStatePartitionStorage,
+            TxManager txManager,
+            ReplicationGroupId replicationGroupId
+    ) {
+        this.txStatePartitionStorage = txStatePartitionStorage;
+        this.txManager = txManager;
+        this.replicationGroupId = replicationGroupId;
+    }
+
+    /**
+     * Handles a {@link TxCleanupRecoveryRequest}.
+     *
+     * @param request Request to handle.
+     * @return Future completed when the request has been handled.
+     */
+    public CompletableFuture<Void> handle(TxCleanupRecoveryRequest request) {
+        runPersistentStorageScan();
+
+        return nullCompletedFuture();
+    }
+
+    private void runPersistentStorageScan() {
+        int committedCount = 0;
+        int abortedCount = 0;
+
+        try (Cursor<IgniteBiTuple<UUID, TxMeta>> txs = 
txStatePartitionStorage.scan()) {
+            for (IgniteBiTuple<UUID, TxMeta> tx : txs) {
+                UUID txId = tx.getKey();
+                TxMeta txMeta = tx.getValue();
+
+                assert !txMeta.enlistedPartitions().isEmpty();
+
+                assert isFinalState(txMeta.txState()) : "Unexpected state 
[txId=" + txId + ", state=" + txMeta.txState() + "].";
+
+                if (txMeta.txState() == COMMITTED) {
+                    committedCount++;
+                } else {
+                    abortedCount++;
+                }
+
+                txManager.cleanup(
+                        replicationGroupId,
+                        txMeta.enlistedPartitions(),
+                        txMeta.txState() == COMMITTED,
+                        txMeta.commitTimestamp(),
+                        txId
+                ).exceptionally(throwable -> {
+                    LOG.warn("Failed to cleanup transaction [txId={}].", 
throwable, txId);
+
+                    return null;
+                });
+            }
+        } catch (IgniteInternalException e) {
+            LOG.warn("Failed to scan transaction state storage 
[commitPartition={}].", e, replicationGroupId);
+        }
+
+        LOG.debug("Persistent storage scan finished [committed={}, 
aborted={}].", committedCount, abortedCount);
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 869b6323462..f8f474d9f01 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -111,6 +111,7 @@ import 
org.apache.ignite.internal.partition.replicator.ReplicaTxFinishMarker;
 import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
 import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine;
 import 
org.apache.ignite.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler;
+import 
org.apache.ignite.internal.partition.replicator.handlers.TxCleanupRecoveryRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.TxRecoveryMessageHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.TxStateCommitPartitionReplicaRequestHandler;
@@ -206,7 +207,6 @@ import org.apache.ignite.internal.tx.LockMode;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
 import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.UpdateCommandResult;
@@ -305,9 +305,6 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
     /** Resources registry. */
     private final RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry;
 
-    /** Tx state storage. */
-    private final TxStatePartitionStorage txStatePartitionStorage;
-
     /** Clock service. */
     private final ClockService clockService;
 
@@ -362,6 +359,7 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
     private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler;
     private final TxStateCommitPartitionReplicaRequestHandler 
txStateCommitPartitionReplicaRequestHandler;
     private final TxRecoveryMessageHandler txRecoveryMessageHandler;
+    private final TxCleanupRecoveryRequestHandler 
txCleanupRecoveryRequestHandler;
     private final MinimumActiveTxTimeReplicaRequestHandler 
minimumActiveTxTimeReplicaRequestHandler;
     private final VacuumTxStateReplicaRequestHandler 
vacuumTxStateReplicaRequestHandler;
     private final BuildIndexReplicaRequestHandler 
buildIndexReplicaRequestHandler;
@@ -427,7 +425,6 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         this.secondaryIndexStorages = secondaryIndexStorages;
         this.clockService = clockService;
         this.safeTime = safeTime;
-        this.txStatePartitionStorage = txStatePartitionStorage;
         this.transactionStateResolver = transactionStateResolver;
         this.storageUpdateHandler = storageUpdateHandler;
         this.schemaSyncService = schemaSyncService;
@@ -474,6 +471,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
 
         txRecoveryMessageHandler = new 
TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId, 
txRecoveryEngine);
 
+        txCleanupRecoveryRequestHandler = new 
TxCleanupRecoveryRequestHandler(txStatePartitionStorage, txManager, 
replicationGroupId);
+
         minimumActiveTxTimeReplicaRequestHandler = new 
MinimumActiveTxTimeReplicaRequestHandler(
                 clockService,
                 raftCommandApplicator);
@@ -499,47 +498,6 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         return new PendingTxPartitionEnlistment(node.name(), 0L, 
((TablePartitionId) replicationGroupId).tableId());
     }
 
-    private void runPersistentStorageScan() {
-        int committedCount = 0;
-        int abortedCount = 0;
-
-        try (Cursor<IgniteBiTuple<UUID, TxMeta>> txs = 
txStatePartitionStorage.scan()) {
-            for (IgniteBiTuple<UUID, TxMeta> tx : txs) {
-                UUID txId = tx.getKey();
-                TxMeta txMeta = tx.getValue();
-
-                assert !txMeta.enlistedPartitions().isEmpty();
-
-                assert isFinalState(txMeta.txState()) : "Unexpected state 
[txId=" + txId + ", state=" + txMeta.txState() + "].";
-
-                if (txMeta.txState() == COMMITTED) {
-                    committedCount++;
-                } else {
-                    abortedCount++;
-                }
-
-                assert !enabledColocation() : "Unexpected method call within 
colocation enabled.";
-
-                txManager.cleanup(
-                        // This method is not called in a colocation context, 
thus it's valid to cast replicationGroupId to TablePartitionId
-                        (TablePartitionId) replicationGroupId,
-                        txMeta.enlistedPartitions(),
-                        txMeta.txState() == COMMITTED,
-                        txMeta.commitTimestamp(),
-                        txId
-                ).exceptionally(throwable -> {
-                    LOG.warn("Failed to cleanup transaction [txId={}].", 
throwable, txId);
-
-                    return null;
-                });
-            }
-        } catch (IgniteInternalException e) {
-            LOG.warn("Failed to scan transaction state storage 
[commitPartition={}].", e, replicationGroupId);
-        }
-
-        LOG.debug("Persistent storage scan finished [committed={}, 
aborted={}].", committedCount, abortedCount);
-    }
-
     @Override
     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
UUID senderId) {
         return replicaPrimacyEngine.validatePrimacy(request)
@@ -609,7 +567,9 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         }
 
         if (request instanceof TxCleanupRecoveryRequest) {
-            return processCleanupRecoveryMessage((TxCleanupRecoveryRequest) 
request);
+            assert !enabledColocation() : "Unexpected method call within 
colocation enabled.";
+
+            return 
txCleanupRecoveryRequestHandler.handle((TxCleanupRecoveryRequest) request);
         }
 
         if (request instanceof GetEstimatedSizeRequest) {
@@ -661,12 +621,6 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         return completedFuture(mvDataStorage.estimatedSize());
     }
 
-    private CompletableFuture<Void> 
processCleanupRecoveryMessage(TxCleanupRecoveryRequest request) {
-        runPersistentStorageScan();
-
-        return nullCompletedFuture();
-    }
-
     private CompletableFuture<Void> 
processChangePeersAndLearnersReplicaRequest(ChangePeersAndLearnersAsyncReplicaRequest
 request) {
         TablePartitionId replicaGrpId = (TablePartitionId) 
request.groupId().asReplicationGroupId();
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index e0a3f29c8e4..c1976c54d3e 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -26,7 +26,6 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -200,7 +199,7 @@ public interface TxManager extends IgniteComponent {
      * @return Completable future of Void.
      */
     CompletableFuture<Void> cleanup(
-            TablePartitionId commitPartitionId,
+            ReplicationGroupId commitPartitionId,
             Collection<EnlistedPartitionGroup> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index e80bebec19a..cce75e9926f 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
 import org.apache.ignite.internal.tx.TransactionResult;
@@ -122,7 +121,7 @@ public class TransactionInflights {
         txCtxMap.keySet().removeAll(txIds);
     }
 
-    void cancelWaitingInflights(TablePartitionId groupId) {
+    void cancelWaitingInflights(ReplicationGroupId groupId) {
         for (Map.Entry<UUID, TxContext> ctxEntry : txCtxMap.entrySet()) {
             if (ctxEntry.getValue() instanceof ReadWriteTxContext) {
                 ReadWriteTxContext txContext = (ReadWriteTxContext) 
ctxEntry.getValue();
@@ -314,7 +313,7 @@ public class TransactionInflights {
             return waitRepFut;
         }
 
-        void cancelWaitingInflights(TablePartitionId groupId, long 
enlistmentConsistencyToken) {
+        void cancelWaitingInflights(ReplicationGroupId groupId, long 
enlistmentConsistencyToken) {
             waitRepFut.completeExceptionally(new 
PrimaryReplicaExpiredException(groupId, enlistmentConsistencyToken, null, 
null));
         }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index e401a21dce1..9d62e0a6677 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -358,21 +358,19 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
                 new TxCleanupRequestSender(txMessageSender, 
placementDriverHelper, txStateVolatileStorage);
     }
 
-    // TODO: IGNITE-24363 - change action argument type to ReplicaGroupId.
     private CompletableFuture<Boolean> primaryReplicaEventListener(
             PrimaryReplicaEventParameters eventParameters,
-            Consumer<TablePartitionId> action
+            Consumer<ReplicationGroupId> action
     ) {
         return inBusyLock(busyLock, () -> {
             assertReplicationGroupType(eventParameters.groupId());
 
-            if (!(eventParameters.groupId() instanceof TablePartitionId)) {
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 - 
remove check for TablePartitionId.
+            if (!(eventParameters.groupId() instanceof TablePartitionId) && 
!(eventParameters.groupId() instanceof ZonePartitionId)) {
                 return falseCompletedFuture();
             }
 
-            TablePartitionId groupId = (TablePartitionId) 
eventParameters.groupId();
-
-            action.accept(groupId);
+            action.accept(eventParameters.groupId());
 
             return falseCompletedFuture();
         });
@@ -973,7 +971,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
 
     @Override
     public CompletableFuture<Void> cleanup(
-            TablePartitionId commitPartitionId,
+            ReplicationGroupId commitPartitionId,
             Collection<EnlistedPartitionGroup> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index 100d38a8ff6..49968b68e88 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.tx.impl;
 
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage;
-import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -33,7 +32,6 @@ import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaResponse;
 import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
@@ -240,14 +238,14 @@ public class TxMessageSender {
      * Send TxCleanupRecoveryRequest.
      *
      * @param primaryConsistentId Node id to send the request to.
-     * @param tablePartitionId Table partition id.
+     * @param replicationGroupId Replication group ID corresponding to a 
partition.
      * @return Completable future of ReplicaResponse.
      */
-    public CompletableFuture<ReplicaResponse> sendRecoveryCleanup(String 
primaryConsistentId, TablePartitionId tablePartitionId) {
+    public CompletableFuture<ReplicaResponse> sendRecoveryCleanup(String 
primaryConsistentId, ReplicationGroupId replicationGroupId) {
         return replicaService.invoke(
                 primaryConsistentId,
                 TX_MESSAGES_FACTORY.txCleanupRecoveryRequest()
-                        
.groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartitionId))
+                        
.groupId(toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, 
replicationGroupId))
                         .build()
         );
     }


Reply via email to