This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 33e83aa9f5 IGNITE-17578 Transactions: async cleanup processing on tx 
commit (#2529)
33e83aa9f5 is described below

commit 33e83aa9f5b1a0fd50e6679129d526772d74b985
Author: Denis Chudov <moongll...@gmail.com>
AuthorDate: Thu Sep 7 18:19:22 2023 +0300

    IGNITE-17578 Transactions: async cleanup processing on tx commit (#2529)
---
 ...ItTxDistributedTestThreeNodesThreeReplicas.java |  8 +++++++
 ...butedTestThreeNodesThreeReplicasCollocated.java |  8 +++++++
 .../replicator/PartitionReplicaListener.java       | 27 +++++++++++++++-------
 .../apache/ignite/distributed/ItTxTestCluster.java |  8 +++----
 4 files changed, 38 insertions(+), 13 deletions(-)

diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
index 71dbcb273a..966ae8cacb 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.TestInfo;
 
 /**
@@ -49,6 +50,13 @@ public class ItTxDistributedTestThreeNodesThreeReplicas 
extends ItTxDistributedT
         return 3;
     }
 
+    /** {@inheritDoc} */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20116";)
+    @Override
+    public void testBalance() throws InterruptedException {
+        super.testBalance();
+    }
+
     @Override
     @AfterEach
     public void after() throws Exception {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
index 79a6080e6b..e128acac09 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 
@@ -46,6 +47,13 @@ public class 
ItTxDistributedTestThreeNodesThreeReplicasCollocated extends ItTxDi
         return false;
     }
 
+    /** {@inheritDoc} */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20116";)
+    @Override
+    public void testBalance() throws InterruptedException {
+        super.testBalance();
+    }
+
     /** {@inheritDoc} */
     @BeforeEach
     @Override public void before() throws Exception {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ee9f8de59a..9d5c3faab0 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -69,6 +69,8 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -158,6 +160,9 @@ import org.jetbrains.annotations.Nullable;
 
 /** Partition replication listener. */
 public class PartitionReplicaListener implements ReplicaListener {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(PartitionReplicaListener.class);
+
     /** Factory to create RAFT command messages. */
     private static final TableMessagesFactory MSG_FACTORY = new 
TableMessagesFactory();
 
@@ -1255,9 +1260,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     /**
      * Processes transaction cleanup request:
      * <ol>
-     *     <li>Run specific raft {@code TxCleanupCommand} command, that will 
convert all pending entries(writeIntents)
-     *     to either regular values({@link TxState#COMMITED}) or removing them 
({@link TxState#ABORTED}).</li>
-     *     <li>Release all locks that were held on local Replica by given 
transaction.</li>
+     *     <li>Waits for finishing of local transactional operations;</li>
+     *     <li>Runs asynchronously the specific raft {@code TxCleanupCommand} 
command, that will convert all pending entries(writeIntents)
+     *     to either regular values({@link TxState#COMMITED}) or removing them 
({@link TxState#ABORTED});</li>
+     *     <li>Releases all locks that were held on local Replica by given 
transaction.</li>
      * </ol>
      * This operation is idempotent, so it's safe to retry it.
      *
@@ -1302,7 +1308,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         if (txUpdateFutures.isEmpty()) {
             if (!txReadFutures.isEmpty()) {
-                allOffFuturesExceptionIgnored(txReadFutures, request)
+                return allOffFuturesExceptionIgnored(txReadFutures, request)
                         .thenRun(() -> releaseTxLocks(request.txId()));
             }
 
@@ -1325,10 +1331,15 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                         
storageUpdateHandler.handleTransactionCleanup(request.txId(), request.commit(), 
request.commitTimestamp());
 
-                        return raftClient
-                                .run(txCleanupCmd)
-                                .thenCompose(ignored -> 
allOffFuturesExceptionIgnored(txReadFutures, request)
-                                        .thenRun(() -> 
releaseTxLocks(request.txId())));
+                        raftClient.run(txCleanupCmd)
+                                .exceptionally(e -> {
+                                    LOG.warn("Failed to complete transaction 
cleanup command [txId=" + request.txId() + ']', e);
+
+                                    return completedFuture(null);
+                                });
+
+                        return allOffFuturesExceptionIgnored(txReadFutures, 
request)
+                                .thenRun(() -> releaseTxLocks(request.txId()));
                     });
         });
     }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 3d62ed1d4e..3fed2fe27d 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -387,8 +387,10 @@ public class ItTxTestCluster {
             TablePartitionId grpId = grpIds.get(p);
 
             for (String assignment : partAssignments) {
+                int partId = p;
+
                 var mvTableStorage = new TestMvTableStorage(tableId, 
DEFAULT_PARTITION_COUNT);
-                var mvPartStorage = new TestMvPartitionStorage(0);
+                var mvPartStorage = new TestMvPartitionStorage(partId);
                 var txStateStorage = txStateStorages.get(assignment);
                 var placementDriver = new 
PlacementDriver(replicaServices.get(assignment), consistentIdToNode);
 
@@ -396,8 +398,6 @@ public class ItTxTestCluster {
                     placementDriver.updateAssignment(grpIds.get(part), 
assignments.get(part));
                 }
 
-                int partId = p;
-
                 int indexId = globalIndexId++;
 
                 ColumnsExtractor row2Tuple = 
BinaryRowConverter.keyExtractor(schemaDescriptor);
@@ -438,8 +438,6 @@ public class ItTxTestCluster {
                         new RaftGroupEventsClientListener()
                 );
 
-                TxManager txManager = txManagers.get(assignment);
-
                 PartitionListener partitionListener = new PartitionListener(
                         txManagers.get(assignment),
                         partitionDataStorage,

Reply via email to