This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 38d3c20693 IGNITE-21578 Improve tests readability and logic in
ItDurableFinishTest. (#3400)
38d3c20693 is described below
commit 38d3c2069392d8069a888363a70cd66fce6f548c
Author: Cyrill <[email protected]>
AuthorDate: Thu Mar 14 15:38:48 2024 +0300
IGNITE-21578 Improve tests readability and logic in ItDurableFinishTest.
(#3400)
---
.../ignite/internal/table/ItDurableFinishTest.java | 138 +++++++++++----------
1 file changed, 73 insertions(+), 65 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 09639d5a81..0eeb8832f4 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -34,7 +35,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.lang.IgniteTriConsumer;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.DefaultMessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
@@ -42,10 +42,12 @@ import
org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException;
import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxCleanupMessage;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
-import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
@@ -72,8 +74,7 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
});
}
- private void testFinishRow(Configurator msgConf,
IgniteTriConsumer<InternalTransaction, TableImpl, Tuple> finisher)
- throws ExecutionException, InterruptedException {
+ private Context prepareTransactionData() throws ExecutionException,
InterruptedException {
createTestTableWith3Replicas();
var tblReplicationGrp = defaultTablePartitionId(node(0));
@@ -103,9 +104,7 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
tbl.recordView().upsert(rwTx, tpl);
- msgConf.accept(primaryNode, coordinatorNode, tbl, rwTx);
-
- finisher.accept(rwTx, tbl, keyTpl);
+ return new Context(primaryNode, coordinatorNode, tbl, rwTx, keyTpl);
}
private TablePartitionId defaultTablePartitionId(IgniteImpl node) {
@@ -114,7 +113,7 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
return new TablePartitionId(table.tableId(), 0);
}
- private void commitRow(InternalTransaction rwTx, TableImpl tbl, Tuple
keyTpl) {
+ private void commitAndValidate(InternalTransaction rwTx, TableImpl tbl,
Tuple keyTpl) {
rwTx.commit();
Tuple storedData = tbl.recordView().get(null, keyTpl);
@@ -132,15 +131,20 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
@Test
void testChangedPrimaryOnFinish() throws ExecutionException,
InterruptedException {
- testFinishRow(this::changedPrimaryOnFinish, this::commitRow);
+ Context context = prepareTransactionData();
+
+ // Drop all finish messages to the old primary, pick a new one.
+ // The coordinator will get a response from the new primary.
+ CompletableFuture<Void> transferPrimaryFuture =
changePrimaryOnFinish(context.coordinatorNode, context.tbl);
+
+ // The primary is changed after calculating the outcome and commit
timestamp.
+ // The new primary successfully commits such transaction.
+ commitAndValidate(context.tx, context.tbl, context.keyTpl);
+
+ assertThat(transferPrimaryFuture, willCompleteSuccessfully());
}
- private void changedPrimaryOnFinish(
- IgniteImpl primaryNode,
- IgniteImpl coordinatorNode,
- TableImpl tbl,
- InternalTransaction tx
- ) {
+ private CompletableFuture<Void> changePrimaryOnFinish(IgniteImpl
coordinatorNode, TableImpl tbl) {
DefaultMessagingService coordinatorMessaging =
messaging(coordinatorNode);
AtomicBoolean dropMessage = new AtomicBoolean(true);
@@ -164,7 +168,7 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
// Now change the commit primary and run tx.commit().
// The transfer is performed asynchronously because the message
processing block we added earlier
// will run in the current thread.
- CompletableFuture.runAsync(() -> {
+ return CompletableFuture.runAsync(() -> {
try {
commitStartedLatch.await();
@@ -182,20 +186,21 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
}
@Test
- void testCoordinatorMissedResponse() throws ExecutionException,
InterruptedException {
- testFinishRow(this::coordinatorMissedResponse, this::commitRow);
+ void testCommitOverCommit() throws ExecutionException,
InterruptedException {
+ Context context = prepareTransactionData();
+
+ // The coordinator does not get the response from the first commit
message, but it anyway reaches the primary and succeeds.
+ // The coordinator has to retry the finish request and survive a
COMMIT over COMMIT.
+ coordinatorDropsFirstFinishMessage(context.coordinatorNode);
+
+ commitAndValidate(context.tx, context.tbl, context.keyTpl);
for (CompletableFuture<?> future : futures) {
assertThat(future, willCompleteSuccessfully());
}
}
- private void coordinatorMissedResponse(
- IgniteImpl primaryNode,
- IgniteImpl coordinatorNode,
- TableImpl tbl,
- InternalTransaction tx
- ) {
+ private void coordinatorDropsFirstFinishMessage(IgniteImpl
coordinatorNode) {
DefaultMessagingService coordinatorMessaging =
messaging(coordinatorNode);
// Make sure the finish message is prepared, i.e. the outcome, commit
timestamp, primary node, etc. have been set,
// and then temporarily block the messaging to simulate network issues.
@@ -224,26 +229,32 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
}
@Test
- void testWaitForCleanup() throws ExecutionException, InterruptedException {
- testFinishRow(this::waitForCleanup, this::commitRow);
+ void testChangePrimaryOnCleanup() throws ExecutionException,
InterruptedException {
+ Context context = prepareTransactionData();
+
+ // The transaction is committed but the primary expires right before
applying the cleanup message.
+ CompletableFuture<Void> transferPrimaryFuture =
changePrimaryOnCleanup(context.primaryNode, context.tbl);
+
+ commitAndValidate(context.tx, context.tbl, context.keyTpl);
+
+ assertThat(transferPrimaryFuture, willCompleteSuccessfully());
}
- private void waitForCleanup(
- IgniteImpl primaryNode,
- IgniteImpl coordinatorNode,
- TableImpl tbl,
- InternalTransaction tx
- ) {
+ private CompletableFuture<Void> changePrimaryOnCleanup(IgniteImpl
primaryNode, TableImpl tbl) {
DefaultMessagingService primaryMessaging = messaging(primaryNode);
AtomicBoolean dropMessage = new AtomicBoolean(true);
+ CountDownLatch cleanupStartedLatch = new CountDownLatch(1);
+
// Make sure the finish message is prepared, i.e. the outcome, commit
timestamp, primary node, etc. have been set,
// and then temporarily block the messaging to simulate network issues.
primaryMessaging.dropMessages((s, networkMessage) -> {
- if (networkMessage instanceof WriteIntentSwitchReplicaRequest &&
dropMessage.get()) {
+ if (networkMessage instanceof TxCleanupMessage &&
dropMessage.get()) {
logger().info("Dropping message: {}.", networkMessage);
+ cleanupStartedLatch.countDown();
+
return true;
}
@@ -253,8 +264,10 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
// Now change the commit primary and run tx.commit().
// The transfer is performed asynchronously because the message
processing block we added earlier
// will run in the current thread.
- CompletableFuture.runAsync(() -> {
+ return CompletableFuture.runAsync(() -> {
try {
+ cleanupStartedLatch.await();
+
logger().info("Start transferring primary.");
NodeUtils.transferPrimary(tbl, null, this::node);
@@ -270,16 +283,20 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
@Test
void testCommitAlreadyAbortedTx() throws ExecutionException,
InterruptedException {
- testFinishRow(this::commitAlreadyAbortedTx,
- (transaction, table, objects) ->
assertThrows(TransactionException.class, transaction::commit));
+ Context context = prepareTransactionData();
+
+ // Simulate the state when a tx has already been committed by writing
a corresponding state into tx state storage.
+ markTxAbortedInTxStateStorage(context.primaryNode, context.tx);
+
+ // Tx.commit should throw MismatchingTransactionOutcomeException.
+ TransactionException transactionException =
assertThrows(TransactionException.class, context.tx::commit);
+
+ Throwable cause =
ExceptionUtils.unwrapCause(transactionException.getCause());
+
+ assertInstanceOf(MismatchingTransactionOutcomeException.class, cause);
}
- private void commitAlreadyAbortedTx(
- IgniteImpl primaryNode,
- IgniteImpl coordinatorNode,
- TableImpl tbl,
- InternalTransaction tx
- ) {
+ private void markTxAbortedInTxStateStorage(IgniteImpl primaryNode,
InternalTransaction tx) {
TableImpl primaryTbl = (TableImpl)
primaryNode.tables().table(TABLE_NAME);
TxStateStorage storage =
primaryTbl.internalTable().txStateStorage().getTxStateStorage(0);
@@ -291,22 +308,6 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
storage.put(tx.id(), txMetaToSet);
}
- /**
- * Gets Ignite instance by the name.
- *
- * @param name Node name.
- * @return Ignite instance.
- */
- private @Nullable IgniteImpl node(String name) {
- for (int i = 0; i < initialNodes(); i++) {
- if (node(i).name().equals(name)) {
- return node(i);
- }
- }
-
- return null;
- }
-
private @Nullable Integer nodeIndex(String name) {
for (int i = 0; i < initialNodes(); i++) {
if (node(i).name().equals(name)) {
@@ -317,13 +318,20 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
return null;
}
- private interface Configurator {
- void accept(
- IgniteImpl primaryNode,
- IgniteImpl coordinatorNode,
- TableImpl tbl,
- InternalTransaction tx
- );
+ private static class Context {
+ private final IgniteImpl primaryNode;
+ private final IgniteImpl coordinatorNode;
+ private final TableImpl tbl;
+ private final InternalTransaction tx;
+ private final Tuple keyTpl;
+
+ private Context(IgniteImpl primaryNode, IgniteImpl coordinatorNode,
TableImpl tbl, InternalTransaction tx, Tuple keyTpl) {
+ this.primaryNode = primaryNode;
+ this.coordinatorNode = coordinatorNode;
+ this.tbl = tbl;
+ this.tx = tx;
+ this.keyTpl = keyTpl;
+ }
}
}