This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 38d3c20693 IGNITE-21578 Improve tests readability and logic in ItDurableFinishTest. (#3400) 38d3c20693 is described below commit 38d3c2069392d8069a888363a70cd66fce6f548c Author: Cyrill <cyrill.si...@gmail.com> AuthorDate: Thu Mar 14 15:38:48 2024 +0300 IGNITE-21578 Improve tests readability and logic in ItDurableFinishTest. (#3400) --- .../ignite/internal/table/ItDurableFinishTest.java | 138 +++++++++++---------- 1 file changed, 73 insertions(+), 65 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java index 09639d5a81..0eeb8832f4 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java @@ -23,6 +23,7 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.apache.ignite.internal.tx.TxState.ABORTED; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -34,7 +35,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; -import org.apache.ignite.internal.lang.IgniteTriConsumer; import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.DefaultMessagingService; import org.apache.ignite.internal.network.NetworkMessage; @@ -42,10 +42,12 @@ import org.apache.ignite.internal.placementdriver.ReplicaMeta; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException; import org.apache.ignite.internal.tx.TxMeta; +import org.apache.ignite.internal.tx.message.TxCleanupMessage; import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; -import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest; import org.apache.ignite.internal.tx.storage.state.TxStateStorage; +import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.table.Tuple; import org.apache.ignite.tx.TransactionException; import org.jetbrains.annotations.Nullable; @@ -72,8 +74,7 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { }); } - private void testFinishRow(Configurator msgConf, IgniteTriConsumer<InternalTransaction, TableImpl, Tuple> finisher) - throws ExecutionException, InterruptedException { + private Context prepareTransactionData() throws ExecutionException, InterruptedException { createTestTableWith3Replicas(); var tblReplicationGrp = defaultTablePartitionId(node(0)); @@ -103,9 +104,7 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { tbl.recordView().upsert(rwTx, tpl); - msgConf.accept(primaryNode, coordinatorNode, tbl, rwTx); - - finisher.accept(rwTx, tbl, keyTpl); + return new Context(primaryNode, coordinatorNode, tbl, rwTx, keyTpl); } private TablePartitionId defaultTablePartitionId(IgniteImpl node) { @@ -114,7 +113,7 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { return new TablePartitionId(table.tableId(), 0); } - private void commitRow(InternalTransaction rwTx, TableImpl tbl, Tuple keyTpl) { + private void commitAndValidate(InternalTransaction rwTx, TableImpl tbl, Tuple keyTpl) { rwTx.commit(); Tuple storedData = tbl.recordView().get(null, keyTpl); @@ -132,15 +131,20 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { @Test void testChangedPrimaryOnFinish() throws ExecutionException, InterruptedException { - testFinishRow(this::changedPrimaryOnFinish, this::commitRow); + Context context = prepareTransactionData(); + + // Drop all finish messages to the old primary, pick a new one. + // The coordinator will get a response from the new primary. + CompletableFuture<Void> transferPrimaryFuture = changePrimaryOnFinish(context.coordinatorNode, context.tbl); + + // The primary is changed after calculating the outcome and commit timestamp. + // The new primary successfully commits such transaction. + commitAndValidate(context.tx, context.tbl, context.keyTpl); + + assertThat(transferPrimaryFuture, willCompleteSuccessfully()); } - private void changedPrimaryOnFinish( - IgniteImpl primaryNode, - IgniteImpl coordinatorNode, - TableImpl tbl, - InternalTransaction tx - ) { + private CompletableFuture<Void> changePrimaryOnFinish(IgniteImpl coordinatorNode, TableImpl tbl) { DefaultMessagingService coordinatorMessaging = messaging(coordinatorNode); AtomicBoolean dropMessage = new AtomicBoolean(true); @@ -164,7 +168,7 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { // Now change the commit primary and run tx.commit(). // The transfer is performed asynchronously because the message processing block we added earlier // will run in the current thread. - CompletableFuture.runAsync(() -> { + return CompletableFuture.runAsync(() -> { try { commitStartedLatch.await(); @@ -182,20 +186,21 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { } @Test - void testCoordinatorMissedResponse() throws ExecutionException, InterruptedException { - testFinishRow(this::coordinatorMissedResponse, this::commitRow); + void testCommitOverCommit() throws ExecutionException, InterruptedException { + Context context = prepareTransactionData(); + + // The coordinator does not get the response from the first commit message, but it anyway reaches the primary and succeeds. + // The coordinator has to retry the finish request and survive a COMMIT over COMMIT. + coordinatorDropsFirstFinishMessage(context.coordinatorNode); + + commitAndValidate(context.tx, context.tbl, context.keyTpl); for (CompletableFuture<?> future : futures) { assertThat(future, willCompleteSuccessfully()); } } - private void coordinatorMissedResponse( - IgniteImpl primaryNode, - IgniteImpl coordinatorNode, - TableImpl tbl, - InternalTransaction tx - ) { + private void coordinatorDropsFirstFinishMessage(IgniteImpl coordinatorNode) { DefaultMessagingService coordinatorMessaging = messaging(coordinatorNode); // Make sure the finish message is prepared, i.e. the outcome, commit timestamp, primary node, etc. have been set, // and then temporarily block the messaging to simulate network issues. @@ -224,26 +229,32 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { } @Test - void testWaitForCleanup() throws ExecutionException, InterruptedException { - testFinishRow(this::waitForCleanup, this::commitRow); + void testChangePrimaryOnCleanup() throws ExecutionException, InterruptedException { + Context context = prepareTransactionData(); + + // The transaction is committed but the primary expires right before applying the cleanup message. + CompletableFuture<Void> transferPrimaryFuture = changePrimaryOnCleanup(context.primaryNode, context.tbl); + + commitAndValidate(context.tx, context.tbl, context.keyTpl); + + assertThat(transferPrimaryFuture, willCompleteSuccessfully()); } - private void waitForCleanup( - IgniteImpl primaryNode, - IgniteImpl coordinatorNode, - TableImpl tbl, - InternalTransaction tx - ) { + private CompletableFuture<Void> changePrimaryOnCleanup(IgniteImpl primaryNode, TableImpl tbl) { DefaultMessagingService primaryMessaging = messaging(primaryNode); AtomicBoolean dropMessage = new AtomicBoolean(true); + CountDownLatch cleanupStartedLatch = new CountDownLatch(1); + // Make sure the finish message is prepared, i.e. the outcome, commit timestamp, primary node, etc. have been set, // and then temporarily block the messaging to simulate network issues. primaryMessaging.dropMessages((s, networkMessage) -> { - if (networkMessage instanceof WriteIntentSwitchReplicaRequest && dropMessage.get()) { + if (networkMessage instanceof TxCleanupMessage && dropMessage.get()) { logger().info("Dropping message: {}.", networkMessage); + cleanupStartedLatch.countDown(); + return true; } @@ -253,8 +264,10 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { // Now change the commit primary and run tx.commit(). // The transfer is performed asynchronously because the message processing block we added earlier // will run in the current thread. - CompletableFuture.runAsync(() -> { + return CompletableFuture.runAsync(() -> { try { + cleanupStartedLatch.await(); + logger().info("Start transferring primary."); NodeUtils.transferPrimary(tbl, null, this::node); @@ -270,16 +283,20 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { @Test void testCommitAlreadyAbortedTx() throws ExecutionException, InterruptedException { - testFinishRow(this::commitAlreadyAbortedTx, - (transaction, table, objects) -> assertThrows(TransactionException.class, transaction::commit)); + Context context = prepareTransactionData(); + + // Simulate the state when a tx has already been committed by writing a corresponding state into tx state storage. + markTxAbortedInTxStateStorage(context.primaryNode, context.tx); + + // Tx.commit should throw MismatchingTransactionOutcomeException. + TransactionException transactionException = assertThrows(TransactionException.class, context.tx::commit); + + Throwable cause = ExceptionUtils.unwrapCause(transactionException.getCause()); + + assertInstanceOf(MismatchingTransactionOutcomeException.class, cause); } - private void commitAlreadyAbortedTx( - IgniteImpl primaryNode, - IgniteImpl coordinatorNode, - TableImpl tbl, - InternalTransaction tx - ) { + private void markTxAbortedInTxStateStorage(IgniteImpl primaryNode, InternalTransaction tx) { TableImpl primaryTbl = (TableImpl) primaryNode.tables().table(TABLE_NAME); TxStateStorage storage = primaryTbl.internalTable().txStateStorage().getTxStateStorage(0); @@ -291,22 +308,6 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { storage.put(tx.id(), txMetaToSet); } - /** - * Gets Ignite instance by the name. - * - * @param name Node name. - * @return Ignite instance. - */ - private @Nullable IgniteImpl node(String name) { - for (int i = 0; i < initialNodes(); i++) { - if (node(i).name().equals(name)) { - return node(i); - } - } - - return null; - } - private @Nullable Integer nodeIndex(String name) { for (int i = 0; i < initialNodes(); i++) { if (node(i).name().equals(name)) { @@ -317,13 +318,20 @@ public class ItDurableFinishTest extends ClusterPerTestIntegrationTest { return null; } - private interface Configurator { - void accept( - IgniteImpl primaryNode, - IgniteImpl coordinatorNode, - TableImpl tbl, - InternalTransaction tx - ); + private static class Context { + private final IgniteImpl primaryNode; + private final IgniteImpl coordinatorNode; + private final TableImpl tbl; + private final InternalTransaction tx; + private final Tuple keyTpl; + + private Context(IgniteImpl primaryNode, IgniteImpl coordinatorNode, TableImpl tbl, InternalTransaction tx, Tuple keyTpl) { + this.primaryNode = primaryNode; + this.coordinatorNode = coordinatorNode; + this.tbl = tbl; + this.tx = tx; + this.keyTpl = keyTpl; + } } }