This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 38d3c20693 IGNITE-21578 Improve tests readability and logic in 
ItDurableFinishTest. (#3400)
38d3c20693 is described below

commit 38d3c2069392d8069a888363a70cd66fce6f548c
Author: Cyrill <cyrill.si...@gmail.com>
AuthorDate: Thu Mar 14 15:38:48 2024 +0300

    IGNITE-21578 Improve tests readability and logic in ItDurableFinishTest. 
(#3400)
---
 .../ignite/internal/table/ItDurableFinishTest.java | 138 +++++++++++----------
 1 file changed, 73 insertions(+), 65 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 09639d5a81..0eeb8832f4 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -34,7 +35,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.lang.IgniteTriConsumer;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.DefaultMessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
@@ -42,10 +42,12 @@ import 
org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException;
 import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxCleanupMessage;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
-import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
@@ -72,8 +74,7 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         });
     }
 
-    private void testFinishRow(Configurator msgConf, 
IgniteTriConsumer<InternalTransaction, TableImpl, Tuple> finisher)
-            throws ExecutionException, InterruptedException {
+    private Context prepareTransactionData() throws ExecutionException, 
InterruptedException {
         createTestTableWith3Replicas();
 
         var tblReplicationGrp = defaultTablePartitionId(node(0));
@@ -103,9 +104,7 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
 
         tbl.recordView().upsert(rwTx, tpl);
 
-        msgConf.accept(primaryNode, coordinatorNode, tbl, rwTx);
-
-        finisher.accept(rwTx, tbl, keyTpl);
+        return new Context(primaryNode, coordinatorNode, tbl, rwTx, keyTpl);
     }
 
     private TablePartitionId defaultTablePartitionId(IgniteImpl node) {
@@ -114,7 +113,7 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         return new TablePartitionId(table.tableId(), 0);
     }
 
-    private void commitRow(InternalTransaction rwTx, TableImpl tbl, Tuple 
keyTpl) {
+    private void commitAndValidate(InternalTransaction rwTx, TableImpl tbl, 
Tuple keyTpl) {
         rwTx.commit();
 
         Tuple storedData = tbl.recordView().get(null, keyTpl);
@@ -132,15 +131,20 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
 
     @Test
     void testChangedPrimaryOnFinish() throws ExecutionException, 
InterruptedException {
-        testFinishRow(this::changedPrimaryOnFinish, this::commitRow);
+        Context context = prepareTransactionData();
+
+        // Drop all finish messages to the old primary, pick a new one.
+        // The coordinator will get a response from the new primary.
+        CompletableFuture<Void> transferPrimaryFuture = 
changePrimaryOnFinish(context.coordinatorNode, context.tbl);
+
+        // The primary is changed after calculating the outcome and commit 
timestamp.
+        // The new primary successfully commits such transaction.
+        commitAndValidate(context.tx, context.tbl, context.keyTpl);
+
+        assertThat(transferPrimaryFuture, willCompleteSuccessfully());
     }
 
-    private void changedPrimaryOnFinish(
-            IgniteImpl primaryNode,
-            IgniteImpl coordinatorNode,
-            TableImpl tbl,
-            InternalTransaction tx
-    ) {
+    private CompletableFuture<Void> changePrimaryOnFinish(IgniteImpl 
coordinatorNode, TableImpl tbl) {
         DefaultMessagingService coordinatorMessaging = 
messaging(coordinatorNode);
 
         AtomicBoolean dropMessage = new AtomicBoolean(true);
@@ -164,7 +168,7 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         // Now change the commit primary and run tx.commit().
         // The transfer is performed asynchronously because the message 
processing block we added earlier
         // will run in the current thread.
-        CompletableFuture.runAsync(() -> {
+        return CompletableFuture.runAsync(() -> {
             try {
                 commitStartedLatch.await();
 
@@ -182,20 +186,21 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
     }
 
     @Test
-    void testCoordinatorMissedResponse() throws ExecutionException, 
InterruptedException {
-        testFinishRow(this::coordinatorMissedResponse, this::commitRow);
+    void testCommitOverCommit() throws ExecutionException, 
InterruptedException {
+        Context context = prepareTransactionData();
+
+        // The coordinator does not get the response from the first commit 
message, but it anyway reaches the primary and succeeds.
+        // The coordinator has to retry the finish request and survive a 
COMMIT over COMMIT.
+        coordinatorDropsFirstFinishMessage(context.coordinatorNode);
+
+        commitAndValidate(context.tx, context.tbl, context.keyTpl);
 
         for (CompletableFuture<?> future : futures) {
             assertThat(future, willCompleteSuccessfully());
         }
     }
 
-    private void coordinatorMissedResponse(
-            IgniteImpl primaryNode,
-            IgniteImpl coordinatorNode,
-            TableImpl tbl,
-            InternalTransaction tx
-    ) {
+    private void coordinatorDropsFirstFinishMessage(IgniteImpl 
coordinatorNode) {
         DefaultMessagingService coordinatorMessaging = 
messaging(coordinatorNode);
         // Make sure the finish message is prepared, i.e. the outcome, commit 
timestamp, primary node, etc. have been set,
         // and then temporarily block the messaging to simulate network issues.
@@ -224,26 +229,32 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
     }
 
     @Test
-    void testWaitForCleanup() throws ExecutionException, InterruptedException {
-        testFinishRow(this::waitForCleanup, this::commitRow);
+    void testChangePrimaryOnCleanup() throws ExecutionException, 
InterruptedException {
+        Context context = prepareTransactionData();
+
+        // The transaction is committed but the primary expires right before 
applying the cleanup message.
+        CompletableFuture<Void> transferPrimaryFuture = 
changePrimaryOnCleanup(context.primaryNode, context.tbl);
+
+        commitAndValidate(context.tx, context.tbl, context.keyTpl);
+
+        assertThat(transferPrimaryFuture, willCompleteSuccessfully());
     }
 
-    private void waitForCleanup(
-            IgniteImpl primaryNode,
-            IgniteImpl coordinatorNode,
-            TableImpl tbl,
-            InternalTransaction tx
-    ) {
+    private CompletableFuture<Void> changePrimaryOnCleanup(IgniteImpl 
primaryNode, TableImpl tbl) {
         DefaultMessagingService primaryMessaging = messaging(primaryNode);
 
         AtomicBoolean dropMessage = new AtomicBoolean(true);
 
+        CountDownLatch cleanupStartedLatch = new CountDownLatch(1);
+
         // Make sure the finish message is prepared, i.e. the outcome, commit 
timestamp, primary node, etc. have been set,
         // and then temporarily block the messaging to simulate network issues.
         primaryMessaging.dropMessages((s, networkMessage) -> {
-            if (networkMessage instanceof WriteIntentSwitchReplicaRequest && 
dropMessage.get()) {
+            if (networkMessage instanceof TxCleanupMessage && 
dropMessage.get()) {
                 logger().info("Dropping message: {}.", networkMessage);
 
+                cleanupStartedLatch.countDown();
+
                 return true;
             }
 
@@ -253,8 +264,10 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         // Now change the commit primary and run tx.commit().
         // The transfer is performed asynchronously because the message 
processing block we added earlier
         // will run in the current thread.
-        CompletableFuture.runAsync(() -> {
+        return CompletableFuture.runAsync(() -> {
             try {
+                cleanupStartedLatch.await();
+
                 logger().info("Start transferring primary.");
 
                 NodeUtils.transferPrimary(tbl, null, this::node);
@@ -270,16 +283,20 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
 
     @Test
     void testCommitAlreadyAbortedTx() throws ExecutionException, 
InterruptedException {
-        testFinishRow(this::commitAlreadyAbortedTx,
-                (transaction, table, objects) -> 
assertThrows(TransactionException.class, transaction::commit));
+        Context context = prepareTransactionData();
+
+        // Simulate the state when a tx has already been committed by writing 
a corresponding state into tx state storage.
+        markTxAbortedInTxStateStorage(context.primaryNode, context.tx);
+
+        // Tx.commit should throw MismatchingTransactionOutcomeException.
+        TransactionException transactionException = 
assertThrows(TransactionException.class, context.tx::commit);
+
+        Throwable cause = 
ExceptionUtils.unwrapCause(transactionException.getCause());
+
+        assertInstanceOf(MismatchingTransactionOutcomeException.class, cause);
     }
 
-    private void commitAlreadyAbortedTx(
-            IgniteImpl primaryNode,
-            IgniteImpl coordinatorNode,
-            TableImpl tbl,
-            InternalTransaction tx
-    ) {
+    private void markTxAbortedInTxStateStorage(IgniteImpl primaryNode, 
InternalTransaction tx) {
         TableImpl primaryTbl = (TableImpl) 
primaryNode.tables().table(TABLE_NAME);
 
         TxStateStorage storage = 
primaryTbl.internalTable().txStateStorage().getTxStateStorage(0);
@@ -291,22 +308,6 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         storage.put(tx.id(), txMetaToSet);
     }
 
-    /**
-     * Gets Ignite instance by the name.
-     *
-     * @param name Node name.
-     * @return Ignite instance.
-     */
-    private @Nullable IgniteImpl node(String name) {
-        for (int i = 0; i < initialNodes(); i++) {
-            if (node(i).name().equals(name)) {
-                return node(i);
-            }
-        }
-
-        return null;
-    }
-
     private @Nullable Integer nodeIndex(String name) {
         for (int i = 0; i < initialNodes(); i++) {
             if (node(i).name().equals(name)) {
@@ -317,13 +318,20 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         return null;
     }
 
-    private interface Configurator {
-        void accept(
-                IgniteImpl primaryNode,
-                IgniteImpl coordinatorNode,
-                TableImpl tbl,
-                InternalTransaction tx
-        );
+    private static class Context {
+        private final IgniteImpl primaryNode;
+        private final IgniteImpl coordinatorNode;
+        private final TableImpl tbl;
+        private final InternalTransaction tx;
+        private final Tuple keyTpl;
+
+        private Context(IgniteImpl primaryNode, IgniteImpl coordinatorNode, 
TableImpl tbl, InternalTransaction tx, Tuple keyTpl) {
+            this.primaryNode = primaryNode;
+            this.coordinatorNode = coordinatorNode;
+            this.tbl = tbl;
+            this.tx = tx;
+            this.keyTpl = keyTpl;
+        }
     }
 
 }

Reply via email to