This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 33e83aa9f5 IGNITE-17578 Transactions: async cleanup processing on tx commit (#2529) 33e83aa9f5 is described below commit 33e83aa9f5b1a0fd50e6679129d526772d74b985 Author: Denis Chudov <moongll...@gmail.com> AuthorDate: Thu Sep 7 18:19:22 2023 +0300 IGNITE-17578 Transactions: async cleanup processing on tx commit (#2529) --- ...ItTxDistributedTestThreeNodesThreeReplicas.java | 8 +++++++ ...butedTestThreeNodesThreeReplicasCollocated.java | 8 +++++++ .../replicator/PartitionReplicaListener.java | 27 +++++++++++++++------- .../apache/ignite/distributed/ItTxTestCluster.java | 8 +++---- 4 files changed, 38 insertions(+), 13 deletions(-) diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java index 71dbcb273a..966ae8cacb 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.TestInfo; /** @@ -49,6 +50,13 @@ public class ItTxDistributedTestThreeNodesThreeReplicas extends ItTxDistributedT return 3; } + /** {@inheritDoc} */ + @Disabled("https://issues.apache.org/jira/browse/IGNITE-20116") + @Override + public void testBalance() throws InterruptedException { + super.testBalance(); + } + @Override @AfterEach public void after() throws Exception { diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java index 79a6080e6b..e128acac09 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java @@ -25,6 +25,7 @@ import java.util.UUID; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; @@ -46,6 +47,13 @@ public class ItTxDistributedTestThreeNodesThreeReplicasCollocated extends ItTxDi return false; } + /** {@inheritDoc} */ + @Disabled("https://issues.apache.org/jira/browse/IGNITE-20116") + @Override + public void testBalance() throws InterruptedException { + super.testBalance(); + } + /** {@inheritDoc} */ @BeforeEach @Override public void before() throws Exception { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index ee9f8de59a..9d5c3faab0 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -69,6 +69,8 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.service.RaftGroupService; @@ -158,6 +160,9 @@ import org.jetbrains.annotations.Nullable; /** Partition replication listener. */ public class PartitionReplicaListener implements ReplicaListener { + /** Logger. */ + private static final IgniteLogger LOG = Loggers.forClass(PartitionReplicaListener.class); + /** Factory to create RAFT command messages. */ private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory(); @@ -1255,9 +1260,10 @@ public class PartitionReplicaListener implements ReplicaListener { /** * Processes transaction cleanup request: * <ol> - * <li>Run specific raft {@code TxCleanupCommand} command, that will convert all pending entries(writeIntents) - * to either regular values({@link TxState#COMMITED}) or removing them ({@link TxState#ABORTED}).</li> - * <li>Release all locks that were held on local Replica by given transaction.</li> + * <li>Waits for finishing of local transactional operations;</li> + * <li>Runs asynchronously the specific raft {@code TxCleanupCommand} command, that will convert all pending entries(writeIntents) + * to either regular values({@link TxState#COMMITED}) or removing them ({@link TxState#ABORTED});</li> + * <li>Releases all locks that were held on local Replica by given transaction.</li> * </ol> * This operation is idempotent, so it's safe to retry it. * @@ -1302,7 +1308,7 @@ public class PartitionReplicaListener implements ReplicaListener { if (txUpdateFutures.isEmpty()) { if (!txReadFutures.isEmpty()) { - allOffFuturesExceptionIgnored(txReadFutures, request) + return allOffFuturesExceptionIgnored(txReadFutures, request) .thenRun(() -> releaseTxLocks(request.txId())); } @@ -1325,10 +1331,15 @@ public class PartitionReplicaListener implements ReplicaListener { storageUpdateHandler.handleTransactionCleanup(request.txId(), request.commit(), request.commitTimestamp()); - return raftClient - .run(txCleanupCmd) - .thenCompose(ignored -> allOffFuturesExceptionIgnored(txReadFutures, request) - .thenRun(() -> releaseTxLocks(request.txId()))); + raftClient.run(txCleanupCmd) + .exceptionally(e -> { + LOG.warn("Failed to complete transaction cleanup command [txId=" + request.txId() + ']', e); + + return completedFuture(null); + }); + + return allOffFuturesExceptionIgnored(txReadFutures, request) + .thenRun(() -> releaseTxLocks(request.txId())); }); }); } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index 3d62ed1d4e..3fed2fe27d 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -387,8 +387,10 @@ public class ItTxTestCluster { TablePartitionId grpId = grpIds.get(p); for (String assignment : partAssignments) { + int partId = p; + var mvTableStorage = new TestMvTableStorage(tableId, DEFAULT_PARTITION_COUNT); - var mvPartStorage = new TestMvPartitionStorage(0); + var mvPartStorage = new TestMvPartitionStorage(partId); var txStateStorage = txStateStorages.get(assignment); var placementDriver = new PlacementDriver(replicaServices.get(assignment), consistentIdToNode); @@ -396,8 +398,6 @@ public class ItTxTestCluster { placementDriver.updateAssignment(grpIds.get(part), assignments.get(part)); } - int partId = p; - int indexId = globalIndexId++; ColumnsExtractor row2Tuple = BinaryRowConverter.keyExtractor(schemaDescriptor); @@ -438,8 +438,6 @@ public class ItTxTestCluster { new RaftGroupEventsClientListener() ); - TxManager txManager = txManagers.get(assignment); - PartitionListener partitionListener = new PartitionListener( txManagers.get(assignment), partitionDataStorage,