This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 056189cbde0 IGNITE-23138 Fix flaky TxWithKeyContentionSelfTest (Cache
12 group) (#11512)
056189cbde0 is described below
commit 056189cbde0688cd48106ac9b7ddfef2413ee08b
Author: Semyon Zikunov <[email protected]>
AuthorDate: Mon Nov 18 19:31:01 2024 +1000
IGNITE-23138 Fix flaky TxWithKeyContentionSelfTest (Cache 12 group) (#11512)
---
.../transactions/TxWithKeyContentionSelfTest.java | 208 ++++++---------------
1 file changed, 60 insertions(+), 148 deletions(-)
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java
index fe2278918e2..4d66bb3a58d 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java
@@ -17,17 +17,14 @@
package org.apache.ignite.internal.processors.cache.transactions;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -35,21 +32,16 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_TX_COLLISIONS_INTERVAL;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
@@ -59,21 +51,14 @@ import static
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
/** Tests tx key contention detection functional. */
public class TxWithKeyContentionSelfTest extends GridCommonAbstractTest {
- /** Client flag. */
- private boolean client;
-
- /** Near cache flag. */
- private boolean nearCache;
+ /** */
+ @Rule
+ public TestName testName = new TestName();
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String name)
throws Exception {
IgniteConfiguration cfg = super.getConfiguration(name);
- cfg.setConsistentId("NODE_" + name.substring(name.length() - 1));
-
- if (client)
- cfg.setClientMode(true);
-
cfg.setDataStorageConfiguration(
new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
@@ -82,24 +67,12 @@ public class TxWithKeyContentionSelfTest extends
GridCommonAbstractTest {
)
);
- TestRecordingCommunicationSpi commSpi = new
TestRecordingCommunicationSpi();
-
- cfg.setCommunicationSpi(commSpi);
-
- cfg.setCacheConfiguration(getCacheConfiguration(DEFAULT_CACHE_NAME));
-
- if (client) {
- cfg.setConsistentId("Client");
-
- cfg.setClientMode(client);
- }
-
return cfg;
}
/** */
- protected CacheConfiguration<?, ?> getCacheConfiguration(String name) {
- CacheConfiguration<Object, Object> ccfg = new
CacheConfiguration<>(name)
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(boolean
nearCache) {
+ CacheConfiguration<Integer, Integer> ccfg = new
CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
.setAffinity(new RendezvousAffinityFunction(false, 16))
@@ -134,7 +107,7 @@ public class TxWithKeyContentionSelfTest extends
GridCommonAbstractTest {
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value =
"30000")
public void testPessimisticRepeatableReadCheckContentionTxMetric() throws
Exception {
- runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ);
+ runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ, false);
}
/**
@@ -145,9 +118,7 @@ public class TxWithKeyContentionSelfTest extends
GridCommonAbstractTest {
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value =
"30000")
public void testPessimisticRepeatableReadCheckContentionTxMetricNear()
throws Exception {
- nearCache = true;
-
- runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ);
+ runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ, true);
}
/**
@@ -156,7 +127,7 @@ public class TxWithKeyContentionSelfTest extends
GridCommonAbstractTest {
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value =
"30000")
public void testPessimisticReadCommitedCheckContentionTxMetric() throws
Exception {
- runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED);
+ runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED, false);
}
/**
@@ -165,9 +136,7 @@ public class TxWithKeyContentionSelfTest extends
GridCommonAbstractTest {
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value =
"30000")
public void testPessimisticReadCommitedCheckContentionTxMetricNear()
throws Exception {
- nearCache = true;
-
- runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED);
+ runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED, true);
}
/**
@@ -176,7 +145,7 @@ public class TxWithKeyContentionSelfTest extends
GridCommonAbstractTest {
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value =
"30000")
public void testOptimisticReadCommittedCheckContentionTxMetric() throws
Exception {
- runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED);
+ runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED, false);
}
/**
@@ -185,9 +154,7 @@ public class TxWithKeyContentionSelfTest extends
GridCommonAbstractTest {
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value =
"30000")
public void testOptimisticReadCommittedCheckContentionTxMetricNear()
throws Exception {
- nearCache = true;
-
- runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED);
+ runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED, true);
}
/**
@@ -196,7 +163,7 @@ public class TxWithKeyContentionSelfTest extends
GridCommonAbstractTest {
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value =
"30000")
public void testOptimisticRepeatableReadCheckContentionTxMetric() throws
Exception {
- runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ);
+ runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ, false);
}
/**
@@ -205,9 +172,7 @@ public class TxWithKeyContentionSelfTest extends
GridCommonAbstractTest {
@Test
@WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value =
"30000")
public void testOptimisticRepeatableReadCheckContentionTxMetricNear()
throws Exception {
- nearCache = true;
-
- runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ);
+ runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ, true);
}
/** Tests metric correct results while tx collisions occured.
@@ -216,120 +181,67 @@ public class TxWithKeyContentionSelfTest extends
GridCommonAbstractTest {
* @param isolation Isolation level.
* @throws Exception If failed.
*/
- private void runKeyCollisionsMetric(TransactionConcurrency concurrency,
TransactionIsolation isolation) throws Exception {
+ private void runKeyCollisionsMetric(TransactionConcurrency concurrency,
TransactionIsolation isolation, boolean nearCache)
+ throws Exception {
Ignite ig = startGridsMultiThreaded(3);
- int contCnt = (int)U.staticField(IgniteTxManager.class,
"COLLISIONS_QUEUE_THRESHOLD") * 5;
-
- CountDownLatch txLatch = new CountDownLatch(contCnt);
-
- ig.cluster().state(ClusterState.ACTIVE);
-
- client = true;
-
- Ignite cl = startGrid();
-
- IgniteTransactions cliTxMgr = cl.transactions();
+ int contCnt = (int)U.staticField(IgniteTxManager.class,
"COLLISIONS_QUEUE_THRESHOLD") * 2;
- IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
+ Ignite cl = startClientGrid();
- IgniteCache<Integer, Integer> cache0 = cl.cache(DEFAULT_CACHE_NAME);
+ IgniteCache<Integer, Integer> clientCache =
cl.createCache(cacheConfiguration(nearCache));
- final Integer keyId = primaryKey(cache);
+ final Integer keyId = primaryKey(ig.cache(DEFAULT_CACHE_NAME));
- CountDownLatch blockOnce = new CountDownLatch(1);
+ IgniteTransactions transactions = cl.transactions();
- for (Ignite ig0 : G.allGrids()) {
- if (ig0.configuration().isClientMode())
- continue;
+ assertFalse(checkMetrics(ig));
- TestRecordingCommunicationSpi commSpi0 =
-
(TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();
+ AtomicBoolean doTest = new AtomicBoolean(true);
- commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode,
Message>() {
- @Override public boolean apply(ClusterNode node, Message msg) {
- if (msg instanceof GridNearTxFinishResponse &&
blockOnce.getCount() > 0) {
- blockOnce.countDown();
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(
+ () -> {
+ while (doTest.get()) {
+ try (Transaction tx = transactions.txStart(concurrency,
isolation)) {
+ clientCache.put(keyId, 0);
- return true;
+ tx.commit();
}
-
- return false;
- }
- });
- }
-
- IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
- try (Transaction tx = cliTxMgr.txStart(concurrency, isolation)) {
- cache0.put(keyId, 0);
- tx.commit();
- }
- });
-
- blockOnce.await();
-
- GridCompoundFuture<?, ?> finishFut = new GridCompoundFuture<>();
-
- for (int i = 0; i < contCnt; ++i) {
- IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
- try (Transaction tx = cliTxMgr.txStart(concurrency,
isolation)) {
- cache0.put(keyId, 0);
-
- tx.commit();
-
- txLatch.countDown();
}
- });
-
- finishFut.add(f0);
+ },
+ contCnt,
+ "txThread-" + testName.getMethodName());
+
+ try {
+ assertTrue(GridTestUtils.waitForCondition(
+ () -> checkMetrics(ig),
+ getTestTimeout()));
}
-
- finishFut.markInitialized();
-
- for (Ignite ig0 : G.allGrids()) {
- TestRecordingCommunicationSpi commSpi0 =
-
(TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();
-
- if (ig0.configuration().isClientMode())
- continue;
-
- commSpi0.stopBlock();
+ finally {
+ doTest.set(false);
+ fut.get(getTestTimeout());
}
+ }
+ /**
+ * Checks if the transaction collision metrics contain the string
"queueSize" for the given Ignite instance.
+ *
+ * @param ig Ignite instance.
+ * @return {@code true} if the metrics contain "queueSize"; otherwise
{@code false}.
+ */
+ private static boolean checkMetrics(Ignite ig) {
IgniteTxManager srvTxMgr =
((IgniteEx)ig).context().cache().context().tm();
- assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- try {
- U.invoke(IgniteTxManager.class, srvTxMgr,
"collectTxCollisionsInfo");
- }
- catch (IgniteCheckedException e) {
- fail(e.toString());
- }
-
- CacheMetrics metrics =
ig.cache(DEFAULT_CACHE_NAME).localMetrics();
-
- String coll1 = metrics.getTxKeyCollisions();
-
- if (!coll1.isEmpty()) {
- String coll2 = metrics.getTxKeyCollisions();
-
- // check idempotent
- assertEquals(coll1, coll2);
-
- assertTrue(coll1.contains("queueSize"));
-
- return true;
- }
- else
- return false;
- }
- }, 10_000));
-
- f.get();
-
- finishFut.get();
+ try {
+ U.invoke(IgniteTxManager.class, srvTxMgr,
"collectTxCollisionsInfo");
+ }
+ catch (IgniteCheckedException e) {
+ fail(e.toString());
+ }
- txLatch.await();
+ return ig.cache(DEFAULT_CACHE_NAME)
+ .localMetrics()
+ .getTxKeyCollisions()
+ .contains("queueSize");
}
}