This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 5b360b8198 IGNITE-21473 Adjust Tx tests structure (#3169) 5b360b8198 is described below commit 5b360b8198b7007b7869316918f08270b00acba9 Author: Denis Chudov <moongll...@gmail.com> AuthorDate: Wed Feb 7 17:31:04 2024 +0300 IGNITE-21473 Adjust Tx tests structure (#3169) --- .../ItTxAbstractDistributedTestSingleNode.java | 194 ++++++++++ .../ItTxDistributedCleanupRecoveryTest.java | 8 +- .../distributed/ItTxDistributedTestSingleNode.java | 241 +------------ .../ItTxDistributedTestSingleNodeCollocated.java | 2 +- ...xDistributedTestSingleNodeNoCleanupMessage.java | 8 +- ...ItTxDistributedTestThreeNodesThreeReplicas.java | 3 +- .../ignite/distributed/ReplicaUnavailableTest.java | 2 +- .../apache/ignite/distributed/ItTxTestCluster.java | 26 +- .../ignite/internal/table/TxAbstractTest.java | 400 ++++++++++++--------- .../internal/tx/impl/ReadWriteTransactionImpl.java | 18 +- 10 files changed, 465 insertions(+), 437 deletions(-) diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java new file mode 100644 index 0000000000..b9dce5b0cc --- /dev/null +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.distributed; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreadedAsync; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.internal.table.TxAbstractTest; +import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl; +import org.apache.ignite.internal.util.CollectionUtils; +import org.apache.ignite.tx.Transaction; +import org.apache.ignite.tx.TransactionException; +import org.apache.ignite.tx.TransactionOptions; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.TestInfo; + +/** + * Test class that is used for the tests that should be run on single node cluster only. + */ +public abstract class ItTxAbstractDistributedTestSingleNode extends TxAbstractTest { + /** + * The constructor. + * + * @param testInfo Test info. + */ + public ItTxAbstractDistributedTestSingleNode(TestInfo testInfo) { + super(testInfo); + } + + @RepeatedTest(10) + public void testTransactionMultiThreadedCommit() { + testTransactionMultiThreadedFinish(1, false); + } + + @RepeatedTest(10) + public void testTransactionMultiThreadedCommitEmpty() { + testTransactionMultiThreadedFinish(1, true); + } + + @RepeatedTest(10) + public void testTransactionMultiThreadedRollback() { + testTransactionMultiThreadedFinish(0, false); + } + + @RepeatedTest(10) + public void testTransactionMultiThreadedRollbackEmpty() { + testTransactionMultiThreadedFinish(0, true); + } + + @RepeatedTest(10) + public void testTransactionMultiThreadedMixed() { + testTransactionMultiThreadedFinish(-1, false); + } + + @RepeatedTest(10) + public void testTransactionMultiThreadedMixedEmpty() { + testTransactionMultiThreadedFinish(-1, true); + } + + /** + * Test trying to finish a tx in multiple threads simultaneously, and enlist new operations right after the first finish. + * + * @param finishMode 1 is commit, 0 is rollback, otherwise random outcome. + * @param checkEmptyTx Whether the tx should be empty on finishing (no enlisted operations). + */ + private void testTransactionMultiThreadedFinish(int finishMode, boolean checkEmptyTx) { + var rv = accounts.recordView(); + + rv.upsert(null, makeValue(1, 1.)); + + Transaction tx = igniteTransactions.begin(); + + var txId = ((ReadWriteTransactionImpl) tx).id(); + + log.info("Started transaction {}", txId); + + if (!checkEmptyTx) { + rv.upsert(tx, makeValue(1, 100.)); + rv.upsert(tx, makeValue(2, 200.)); + } + + int threadNum = Runtime.getRuntime().availableProcessors() * 5; + + CyclicBarrier b = new CyclicBarrier(threadNum); + CountDownLatch finishLatch = new CountDownLatch(1); + + var futEnlists = runMultiThreadedAsync(() -> { + finishLatch.await(); + var rnd = ThreadLocalRandom.current(); + + assertThrowsWithCode(TransactionException.class, TX_ALREADY_FINISHED_ERR, () -> { + if (rnd.nextBoolean()) { + rv.upsert(tx, makeValue(2, 200.)); + } else { + rv.get(tx, makeKey(1)); + } + }, "Transaction is already finished"); + + return null; + }, threadNum, "txCommitTestThread"); + + var futFinishes = runMultiThreadedAsync(() -> { + b.await(); + + finishTx(tx, finishMode); + + finishLatch.countDown(); + + return null; + }, threadNum, "txCommitTestThread"); + + assertThat(futFinishes, willSucceedFast()); + assertThat(futEnlists, willSucceedFast()); + + assertTrue(CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId))); + } + + /** + * Test trying to finish a read only tx in multiple threads simultaneously. + */ + @RepeatedTest(10) + public void testReadOnlyTransactionMultiThreadedFinish() { + var rv = accounts.recordView(); + + rv.upsert(null, makeValue(1, 1.)); + + Transaction tx = igniteTransactions.begin(new TransactionOptions().readOnly(true)); + + rv.get(tx, makeKey(1)); + + int threadNum = Runtime.getRuntime().availableProcessors(); + + CyclicBarrier b = new CyclicBarrier(threadNum); + + // TODO https://issues.apache.org/jira/browse/IGNITE-21411 Check enlists are prohibited. + var futFinishes = runMultiThreadedAsync(() -> { + b.await(); + + finishTx(tx, -1); + + return null; + }, threadNum, "txCommitTestThread"); + + assertThat(futFinishes, willSucceedFast()); + } + + /** + * Finish the tx. + * + * @param tx Transaction. + * @param finishMode 1 is commit, 0 is rollback, otherwise random outcome. + */ + private void finishTx(Transaction tx, int finishMode) { + if (finishMode == 0) { + tx.rollback(); + } else if (finishMode == 1) { + tx.commit(); + } else { + var rnd = ThreadLocalRandom.current(); + if (rnd.nextBoolean()) { + tx.commit(); + } else { + tx.rollback(); + } + } + } + + @Override + protected int nodes() { + return 1; + } +} diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java index 2c55017669..4aa053bdd3 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.distributed; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.internal.network.DefaultMessagingService; +import org.apache.ignite.internal.table.TxAbstractTest; import org.apache.ignite.internal.tx.message.TxCleanupMessage; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; @@ -26,7 +27,7 @@ import org.junit.jupiter.api.TestInfo; /** * Durable cleanup test with successful recovery after the failures. */ -public class ItTxDistributedCleanupRecoveryTest extends ItTxDistributedTestSingleNode { +public class ItTxDistributedCleanupRecoveryTest extends TxAbstractTest { private AtomicInteger defaultRetryCount; @@ -82,4 +83,9 @@ public class ItTxDistributedCleanupRecoveryTest extends ItTxDistributedTestSingl log.info("Tables have been started"); } + + @Override + protected int nodes() { + return 1; + } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java index 4abe5f68ef..c4a4e95e2c 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java @@ -17,255 +17,18 @@ package org.apache.ignite.distributed; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; - -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; -import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; -import org.apache.ignite.internal.network.ClusterService; -import org.apache.ignite.internal.network.NodeFinder; -import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; -import org.apache.ignite.internal.placementdriver.ReplicaMeta; -import org.apache.ignite.internal.raft.Loza; -import org.apache.ignite.internal.raft.Peer; -import org.apache.ignite.internal.raft.RaftNodeId; -import org.apache.ignite.internal.raft.configuration.RaftConfiguration; -import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; -import org.apache.ignite.internal.replicator.ReplicaService; -import org.apache.ignite.internal.replicator.TablePartitionId; -import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; -import org.apache.ignite.internal.storage.MvPartitionStorage; -import org.apache.ignite.internal.table.InternalTable; -import org.apache.ignite.internal.table.TableViewInternal; -import org.apache.ignite.internal.table.TxAbstractTest; -import org.apache.ignite.internal.table.distributed.raft.PartitionListener; -import org.apache.ignite.internal.testframework.IgniteTestUtils; -import org.apache.ignite.internal.tx.InternalTransaction; -import org.apache.ignite.internal.tx.TxManager; -import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; -import org.apache.ignite.network.ClusterNode; -import org.apache.ignite.tx.Transaction; -import org.apache.ignite.tx.TransactionOptions; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mockito; /** * Distributed transaction test using a single partition table. */ -@ExtendWith(ConfigurationExtension.class) -public class ItTxDistributedTestSingleNode extends TxAbstractTest { - protected static int ACC_TABLE_ID = 0; - - protected static int CUST_TABLE_ID = 1; - - protected static final String ACC_TABLE_NAME = "accounts"; - - protected static final String CUST_TABLE_NAME = "customers"; - - //TODO fsync can be turned on again after https://issues.apache.org/jira/browse/IGNITE-20195 - @InjectConfiguration("mock: { fsync: false }") - protected RaftConfiguration raftConfiguration; - - @InjectConfiguration - protected TransactionConfiguration txConfiguration; - - @InjectConfiguration - protected StorageUpdateConfiguration storageUpdateConfiguration; - - /** - * Returns a count of nodes. - * - * @return Nodes. - */ - protected int nodes() { - return 1; - } - - /** - * Returns a count of replicas. - * - * @return Replicas. - */ - protected int replicas() { - return 1; - } - - /** - * Returns {@code true} to disable collocation by using dedicated client node. - * - * @return {@code true} to disable collocation. - */ - protected boolean startClient() { - return true; - } - - protected final TestInfo testInfo; - - protected ItTxTestCluster txTestCluster; - +public class ItTxDistributedTestSingleNode extends ItTxAbstractDistributedTestSingleNode { /** * The constructor. * * @param testInfo Test info. */ public ItTxDistributedTestSingleNode(TestInfo testInfo) { - this.testInfo = testInfo; - } - - /** - * Initialize the test state. - */ - @BeforeEach - public void before() throws Exception { - txTestCluster = new ItTxTestCluster( - testInfo, - raftConfiguration, - txConfiguration, - storageUpdateConfiguration, - workDir, - nodes(), - replicas(), - startClient(), - timestampTracker - ); - txTestCluster.prepareCluster(); - - this.igniteTransactions = txTestCluster.igniteTransactions; - - accounts = txTestCluster.startTable(ACC_TABLE_NAME, ACC_TABLE_ID, ACCOUNTS_SCHEMA); - customers = txTestCluster.startTable(CUST_TABLE_NAME, CUST_TABLE_ID, CUSTOMERS_SCHEMA); - - log.info("Tables have been started"); - } - - /** - * Shutdowns all cluster nodes after each test. - * - * @throws Exception If failed. - */ - @AfterEach - public void after() throws Exception { - txTestCluster.shutdownCluster(); - Mockito.framework().clearInlineMocks(); - } - - /** - * Starts a node. - * - * @param name Node name. - * @param port Local port. - * @param nodeFinder Node finder. - * @return The client cluster view. - */ - protected static ClusterService startNode(TestInfo testInfo, String name, int port, - NodeFinder nodeFinder) { - var network = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder); - - network.start(); - - return network; - } - - /** {@inheritDoc} */ - @Override - protected TxManager clientTxManager() { - return txTestCluster.clientTxManager; - } - - /** {@inheritDoc} */ - @Override - protected TxManager txManager(TableViewInternal t) { - CompletableFuture<ReplicaMeta> primaryReplicaFuture = txTestCluster.placementDriver.getPrimaryReplica( - new TablePartitionId(t.tableId(), 0), - txTestCluster.clocks.get(txTestCluster.localNodeName).now()); - - assertThat(primaryReplicaFuture, willCompleteSuccessfully()); - - TxManager manager = txTestCluster.txManagers.get(primaryReplicaFuture.join().getLeaseholder()); - - assertNotNull(manager); - - return manager; - } - - /** - * Check the storage of partition is the same across all nodes. The checking is based on {@link MvPartitionStorage#lastAppliedIndex()} - * that is increased on all update storage operation. - * TODO: IGNITE-18869 The method must be updated when a proper way to compare storages will be implemented. - * - * @param table The table. - * @param partId Partition id. - * @return True if {@link MvPartitionStorage#lastAppliedIndex()} is equivalent across all nodes, false otherwise. - */ - @Override - protected boolean assertPartitionsSame(TableViewInternal table, int partId) { - long storageIdx = 0; - - for (Map.Entry<String, Loza> entry : txTestCluster.raftServers.entrySet()) { - Loza svc = entry.getValue(); - - var server = (JraftServerImpl) svc.server(); - - var groupId = new TablePartitionId(table.tableId(), partId); - - Peer serverPeer = server.localPeers(groupId).get(0); - - org.apache.ignite.raft.jraft.RaftGroupService grp = server.raftGroupService(new RaftNodeId(groupId, serverPeer)); - - var fsm = (JraftServerImpl.DelegatingStateMachine) grp.getRaftNode().getOptions().getFsm(); - - PartitionListener listener = (PartitionListener) fsm.getListener(); - - MvPartitionStorage storage = listener.getMvStorage(); - - if (storageIdx == 0) { - storageIdx = storage.lastAppliedIndex(); - } else if (storageIdx != storage.lastAppliedIndex()) { - return false; - } - } - - return true; - } - - @Override - protected void injectFailureOnNextOperation(TableViewInternal accounts) { - InternalTable internalTable = accounts.internalTable(); - ReplicaService replicaService = IgniteTestUtils.getFieldValue(internalTable, "replicaSvc"); - Mockito.doReturn(CompletableFuture.failedFuture(new Exception())).when(replicaService).invoke((String) any(), any()); - Mockito.doReturn(CompletableFuture.failedFuture(new Exception())).when(replicaService).invoke((ClusterNode) any(), any()); - } - - @Override - protected Collection<TxManager> txManagers() { - return txTestCluster.txManagers.values(); - } - - @Test - public void testIgniteTransactionsAndReadTimestamp() { - Transaction readWriteTx = igniteTransactions.begin(); - assertFalse(readWriteTx.isReadOnly()); - assertNull(((InternalTransaction) readWriteTx).readTimestamp()); - - Transaction readOnlyTx = igniteTransactions.begin(new TransactionOptions().readOnly(true)); - assertTrue(readOnlyTx.isReadOnly()); - assertNotNull(((InternalTransaction) readOnlyTx).readTimestamp()); - - readWriteTx.commit(); - - Transaction readOnlyTx2 = igniteTransactions.begin(new TransactionOptions().readOnly(true)); - readOnlyTx2.rollback(); + super(testInfo); } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeCollocated.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeCollocated.java index 87ed14496d..a5bce88e25 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeCollocated.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeCollocated.java @@ -25,7 +25,7 @@ import org.junit.jupiter.api.TestInfo; /** * Distributed transaction test using a single partition table, collocated on a leader. */ -public class ItTxDistributedTestSingleNodeCollocated extends ItTxDistributedTestSingleNode { +public class ItTxDistributedTestSingleNodeCollocated extends ItTxAbstractDistributedTestSingleNode { /** * The constructor. * diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java index 33517a1f0e..e900af92ec 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.replicator.ReplicaResult; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.table.TxAbstractTest; import org.apache.ignite.internal.table.distributed.IndexLocker; import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage; @@ -73,7 +74,7 @@ import org.junit.jupiter.api.TestInfo; /** * Test to Simulate missing cleanup action. */ -public class ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistributedTestSingleNode { +public class ItTxDistributedTestSingleNodeNoCleanupMessage extends TxAbstractTest { /** A list of background cleanup futures. */ private final List<CompletableFuture<?>> cleanupFutures = new CopyOnWriteArrayList<>(); @@ -258,4 +259,9 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut private static void releaseTxLocks(UUID txId, LockManager lockManager) { lockManager.releaseAll(txId); } + + @Override + protected int nodes() { + return 1; + } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java index 930e138567..333ae059a9 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.table.TxAbstractTest; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl; import org.apache.ignite.raft.jraft.rpc.RpcRequests; @@ -36,7 +37,7 @@ import org.junit.jupiter.api.TestInfo; /** * Distributed transaction test using a single partition table, 3 nodes and 3 replicas. */ -public class ItTxDistributedTestThreeNodesThreeReplicas extends ItTxDistributedTestSingleNode { +public class ItTxDistributedTestThreeNodesThreeReplicas extends TxAbstractTest { /** * The constructor. * diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java index 8a55df9419..665f78f7f5 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java @@ -18,8 +18,8 @@ package org.apache.ignite.distributed; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.ignite.distributed.ItTxDistributedTestSingleNode.startNode; import static org.apache.ignite.distributed.ItTxTestCluster.NODE_PORT_BASE; +import static org.apache.ignite.internal.table.TxAbstractTest.startNode; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index 84d179aac0..a390fe8280 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -286,7 +286,7 @@ public class ItTxTestCluster { /** * Initialize the test state. */ - protected void prepareCluster() throws Exception { + public void prepareCluster() throws Exception { assertTrue(nodes > 0); assertTrue(replicas > 0); @@ -888,4 +888,28 @@ public class ItTxTestCluster { clientTxStateResolver.start(); clientTxManager.start(); } + + public Map<String, Loza> raftServers() { + return raftServers; + } + + public Map<String, TxManager> txManagers() { + return txManagers; + } + + public PlacementDriver placementDriver() { + return placementDriver; + } + + public TxManager clientTxManager() { + return clientTxManager; + } + + public String localNodeName() { + return localNodeName; + } + + public Map<String, HybridClock> clocks() { + return clocks; + } } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java index 2573ced9a0..6ff755f355 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -22,9 +22,8 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreadedAsync; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -37,6 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; import java.util.ArrayList; import java.util.Collection; @@ -50,22 +50,37 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Flow; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiConsumer; import java.util.function.Consumer; +import org.apache.ignite.distributed.ItTxTestCluster; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.network.NodeFinder; +import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaRegistry; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl; import org.apache.ignite.internal.schema.row.Row; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.table.distributed.raft.PartitionListener; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.HybridTimestampTracker; @@ -78,12 +93,15 @@ import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.TxPriority; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.TxStateMeta; +import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl; import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.internal.util.CollectionUtils; import org.apache.ignite.internal.util.Pair; import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.raft.jraft.RaftGroupService; import org.apache.ignite.table.KeyValueView; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; @@ -92,12 +110,15 @@ import org.apache.ignite.tx.Transaction; import org.apache.ignite.tx.TransactionException; import org.apache.ignite.tx.TransactionOptions; import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -105,9 +126,23 @@ import org.mockito.quality.Strictness; /** * TODO asch IGNITE-15928 validate zero locks after test commit. */ -@ExtendWith(MockitoExtension.class) +@ExtendWith({MockitoExtension.class, ConfigurationExtension.class}) @MockitoSettings(strictness = Strictness.LENIENT) public abstract class TxAbstractTest extends IgniteAbstractTest { + protected static final double BALANCE_1 = 500; + + protected static final double BALANCE_2 = 500; + + protected static final double DELTA = 100; + + protected static int ACC_TABLE_ID = 0; + + protected static int CUST_TABLE_ID = 1; + + protected static final String ACC_TABLE_NAME = "accounts"; + + protected static final String CUST_TABLE_NAME = "customers"; + protected static SchemaDescriptor ACCOUNTS_SCHEMA = new SchemaDescriptor( 1, new Column[]{new Column("accountNumber".toUpperCase(), NativeTypes.INT64, false)}, @@ -126,15 +161,181 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { /** Customers table id -> name. */ protected TableViewInternal customers; - protected static final double BALANCE_1 = 500; + protected HybridTimestampTracker timestampTracker = new HybridTimestampTracker(); - protected static final double BALANCE_2 = 500; + protected IgniteTransactions igniteTransactions; - protected static final double DELTA = 100; + //TODO fsync can be turned on again after https://issues.apache.org/jira/browse/IGNITE-20195 + @InjectConfiguration("mock: { fsync: false }") + protected RaftConfiguration raftConfiguration; - protected HybridTimestampTracker timestampTracker = new HybridTimestampTracker(); + @InjectConfiguration + protected TransactionConfiguration txConfiguration; - protected IgniteTransactions igniteTransactions; + @InjectConfiguration + protected StorageUpdateConfiguration storageUpdateConfiguration; + + protected final TestInfo testInfo; + + protected ItTxTestCluster txTestCluster; + + /** + * Returns a count of nodes. + * + * @return Nodes. + */ + protected abstract int nodes(); + + /** + * Returns a count of replicas. + * + * @return Replicas. + */ + protected int replicas() { + return 1; + } + + /** + * Returns {@code true} to disable collocation by using dedicated client node. + * + * @return {@code true} to disable collocation. + */ + protected boolean startClient() { + return true; + } + + /** + * The constructor. + * + * @param testInfo Test info. + */ + public TxAbstractTest(TestInfo testInfo) { + this.testInfo = testInfo; + } + + /** + * Initialize the test state. + */ + @BeforeEach + public void before() throws Exception { + txTestCluster = new ItTxTestCluster( + testInfo, + raftConfiguration, + txConfiguration, + storageUpdateConfiguration, + workDir, + nodes(), + replicas(), + startClient(), + timestampTracker + ); + txTestCluster.prepareCluster(); + + this.igniteTransactions = txTestCluster.igniteTransactions(); + + accounts = txTestCluster.startTable(ACC_TABLE_NAME, ACC_TABLE_ID, ACCOUNTS_SCHEMA); + customers = txTestCluster.startTable(CUST_TABLE_NAME, CUST_TABLE_ID, CUSTOMERS_SCHEMA); + + log.info("Tables have been started"); + } + + /** + * Shutdowns all cluster nodes after each test. + * + * @throws Exception If failed. + */ + @AfterEach + public void after() throws Exception { + txTestCluster.shutdownCluster(); + Mockito.framework().clearInlineMocks(); + } + + /** + * Starts a node. + * + * @param name Node name. + * @param port Local port. + * @param nodeFinder Node finder. + * @return The client cluster view. + */ + public static ClusterService startNode(TestInfo testInfo, String name, int port, + NodeFinder nodeFinder) { + var network = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder); + + network.start(); + + return network; + } + + /** {@inheritDoc} */ + protected TxManager clientTxManager() { + return txTestCluster.clientTxManager(); + } + + /** {@inheritDoc} */ + protected TxManager txManager(TableViewInternal t) { + CompletableFuture<ReplicaMeta> primaryReplicaFuture = txTestCluster.placementDriver().getPrimaryReplica( + new TablePartitionId(t.tableId(), 0), + txTestCluster.clocks().get(txTestCluster.localNodeName()).now()); + + assertThat(primaryReplicaFuture, willCompleteSuccessfully()); + + TxManager manager = txTestCluster.txManagers().get(primaryReplicaFuture.join().getLeaseholder()); + + assertNotNull(manager); + + return manager; + } + + /** + * Check the storage of partition is the same across all nodes. The checking is based on {@link MvPartitionStorage#lastAppliedIndex()} + * that is increased on all update storage operation. + * TODO: IGNITE-18869 The method must be updated when a proper way to compare storages will be implemented. + * + * @param table The table. + * @param partId Partition id. + * @return True if {@link MvPartitionStorage#lastAppliedIndex()} is equivalent across all nodes, false otherwise. + */ + protected boolean assertPartitionsSame(TableViewInternal table, int partId) { + long storageIdx = 0; + + for (Map.Entry<String, Loza> entry : txTestCluster.raftServers().entrySet()) { + Loza svc = entry.getValue(); + + var server = (JraftServerImpl) svc.server(); + + var groupId = new TablePartitionId(table.tableId(), partId); + + Peer serverPeer = server.localPeers(groupId).get(0); + + RaftGroupService grp = server.raftGroupService(new RaftNodeId(groupId, serverPeer)); + + var fsm = (JraftServerImpl.DelegatingStateMachine) grp.getRaftNode().getOptions().getFsm(); + + PartitionListener listener = (PartitionListener) fsm.getListener(); + + MvPartitionStorage storage = listener.getMvStorage(); + + if (storageIdx == 0) { + storageIdx = storage.lastAppliedIndex(); + } else if (storageIdx != storage.lastAppliedIndex()) { + return false; + } + } + + return true; + } + + protected void injectFailureOnNextOperation(TableViewInternal accounts) { + InternalTable internalTable = accounts.internalTable(); + ReplicaService replicaService = IgniteTestUtils.getFieldValue(internalTable, "replicaSvc"); + Mockito.doReturn(CompletableFuture.failedFuture(new Exception())).when(replicaService).invoke((String) any(), any()); + Mockito.doReturn(CompletableFuture.failedFuture(new Exception())).when(replicaService).invoke((ClusterNode) any(), any()); + } + + protected Collection<TxManager> txManagers() { + return txTestCluster.txManagers().values(); + } @Test public void testCommitRollbackSameTxDoesNotThrow() throws TransactionException { @@ -1747,21 +1948,6 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { return Tuple.create().set("name", name); } - /** - * Get a client tx manager. - * - * @return TX manager. - */ - protected abstract TxManager clientTxManager(); - - /** - * Get a tx manager on a partition leader. - * - * @param t The table. - * @return TX manager. - */ - protected abstract TxManager txManager(TableViewInternal t); - /** * Get a lock manager on a partition leader. * @@ -1772,15 +1958,6 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { return txManager(t).lockManager(); } - /** - * Validates table partition equality by calculating a hash code over data. - * - * @param table The table. - * @param partId Partition id. - * @return {@code True} if a replicas are the same. - */ - protected abstract boolean assertPartitionsSame(TableViewInternal table, int partId); - /** * Validates balances. * @@ -2158,142 +2335,20 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { assertThrows(TransactionException.class, () -> keyValueView.put(youngNormalTx, 1L, "normal")); } - @RepeatedTest(10) - public void testTransactionMultiThreadedCommit() { - testTransactionMultiThreadedFinish(1, false); - } - - @RepeatedTest(10) - public void testTransactionMultiThreadedCommitEmpty() { - testTransactionMultiThreadedFinish(1, true); - } - - @RepeatedTest(10) - public void testTransactionMultiThreadedRollback() { - testTransactionMultiThreadedFinish(0, false); - } - - @RepeatedTest(10) - public void testTransactionMultiThreadedRollbackEmpty() { - testTransactionMultiThreadedFinish(0, true); - } - - @RepeatedTest(10) - public void testTransactionMultiThreadedMixed() { - testTransactionMultiThreadedFinish(-1, false); - } - - @RepeatedTest(10) - public void testTransactionMultiThreadedMixedEmpty() { - testTransactionMultiThreadedFinish(-1, true); - } - - /** - * Test trying to finish a tx in multiple threads simultaneously, and enlist new operations right after the first finish. - * - * @param finishMode 1 is commit, 0 is rollback, otherwise random outcome. - * @param checkEmptyTx Whether the tx should be empty on finishing (no enlisted operations). - */ - private void testTransactionMultiThreadedFinish(int finishMode, boolean checkEmptyTx) { - var rv = accounts.recordView(); - - rv.upsert(null, makeValue(1, 1.)); - - Transaction tx = igniteTransactions.begin(); - - var txId = ((ReadWriteTransactionImpl) tx).id(); - - log.info("Started transaction {}", txId); - - if (!checkEmptyTx) { - rv.upsert(tx, makeValue(1, 100.)); - rv.upsert(tx, makeValue(2, 200.)); - } - - int threadNum = Runtime.getRuntime().availableProcessors() * 5; - - CyclicBarrier b = new CyclicBarrier(threadNum); - CountDownLatch finishLatch = new CountDownLatch(1); - - var futEnlists = runMultiThreadedAsync(() -> { - finishLatch.await(); - var rnd = ThreadLocalRandom.current(); - - assertThrowsWithCode(TransactionException.class, TX_ALREADY_FINISHED_ERR, () -> { - if (rnd.nextBoolean()) { - rv.upsert(tx, makeValue(2, 200.)); - } else { - rv.get(tx, makeKey(1)); - } - }, "Transaction is already finished"); - - return null; - }, threadNum, "txCommitTestThread"); - - var futFinishes = runMultiThreadedAsync(() -> { - b.await(); - - finishTx(tx, finishMode); - - finishLatch.countDown(); - - return null; - }, threadNum, "txCommitTestThread"); - - assertThat(futFinishes, willSucceedFast()); - assertThat(futEnlists, willSucceedFast()); - - assertTrue(CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId))); - } - - /** - * Test trying to finish a read only tx in multiple threads simultaneously. - */ - @RepeatedTest(10) - public void testReadOnlyTransactionMultiThreadedFinish() { - var rv = accounts.recordView(); - - rv.upsert(null, makeValue(1, 1.)); - - Transaction tx = igniteTransactions.begin(new TransactionOptions().readOnly(true)); - - rv.get(tx, makeKey(1)); - - int threadNum = Runtime.getRuntime().availableProcessors(); - - CyclicBarrier b = new CyclicBarrier(threadNum); - - // TODO https://issues.apache.org/jira/browse/IGNITE-21411 Check enlists are prohibited. - var futFinishes = runMultiThreadedAsync(() -> { - b.await(); + @Test + public void testIgniteTransactionsAndReadTimestamp() { + Transaction readWriteTx = igniteTransactions.begin(); + assertFalse(readWriteTx.isReadOnly()); + assertNull(((InternalTransaction) readWriteTx).readTimestamp()); - finishTx(tx, -1); + Transaction readOnlyTx = igniteTransactions.begin(new TransactionOptions().readOnly(true)); + assertTrue(readOnlyTx.isReadOnly()); + assertNotNull(((InternalTransaction) readOnlyTx).readTimestamp()); - return null; - }, threadNum, "txCommitTestThread"); + readWriteTx.commit(); - assertThat(futFinishes, willSucceedFast()); - } - - /** - * Finish the tx. - * - * @param tx Transaction. - * @param finishMode 1 is commit, 0 is rollback, otherwise random outcome. - */ - private void finishTx(Transaction tx, int finishMode) { - if (finishMode == 0) { - tx.rollback(); - } else if (finishMode == 1) { - tx.commit(); - } else { - var rnd = ThreadLocalRandom.current(); - if (rnd.nextBoolean()) { - tx.commit(); - } else { - tx.rollback(); - } - } + Transaction readOnlyTx2 = igniteTransactions.begin(new TransactionOptions().readOnly(true)); + readOnlyTx2.rollback(); } /** @@ -2351,13 +2406,4 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { assertThat(res, contains(null, null)); } } - - protected abstract void injectFailureOnNextOperation(TableViewInternal accounts); - - /** - * Returns server nodes' tx managers. - * - * @return Server nodes' tx managers. - */ - protected abstract Collection<TxManager> txManagers(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java index 91f3b9bae0..305d85438c 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java @@ -18,8 +18,6 @@ package org.apache.ignite.internal.tx.impl; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; -import static org.apache.ignite.internal.tx.TxState.FINISHING; -import static org.apache.ignite.internal.tx.TxState.isFinalState; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR; import java.util.Map; @@ -120,22 +118,14 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl { * Checks that this transaction was not finished and will be able to enlist another partition. */ private void checkEnlistPossibility() { - if (hasTxFinalizationBegun()) { + if (finishFuture != null) { + // This means that the transaction is either in final or FINISHING state. throw new TransactionException( TX_ALREADY_FINISHED_ERR, format("Transaction is already finished [id={}, state={}].", id(), state())); } } - /** - * Checks the transaction state and makes a decision depends on it. - * - * @return True when the transaction started to finalize, false otherwise. - */ - private boolean hasTxFinalizationBegun() { - return isFinalState(state()) || state() == FINISHING; - } - /** {@inheritDoc} */ @Override protected CompletableFuture<Void> finish(boolean commit) { @@ -146,9 +136,7 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl { enlistPartitionLock.writeLock().lock(); try { - if (!hasTxFinalizationBegun()) { - assert finishFuture == null : "Transaction is already finished [id=" + id() + ", state=" + state() + "]."; - + if (finishFuture == null) { CompletableFuture<Void> finishFutureInternal = finishInternal(commit); finishFuture = finishFutureInternal.handle((unused, throwable) -> null);