http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java new file mode 100644 index 0000000..87539b2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionDeadlockException; +import org.apache.ignite.transactions.TransactionTimeoutException; +import org.jsr166.ThreadLocalRandom8; + +import static org.apache.ignite.internal.util.typedef.X.hasCause; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class TxDeadlockDetectionTest extends GridCommonAbstractTest { + /** Nodes count. */ + private static final int NODES_CNT = 3; + + /** Cache. */ + private static final String CACHE = "cache"; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (isDebug()) { + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.failureDetectionTimeoutEnabled(false); + + cfg.setDiscoverySpi(discoSpi); + } + + TcpCommunicationSpi commSpi = new TestCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setName(CACHE); + ccfg.setCacheMode(CacheMode.PARTITIONED); + ccfg.setBackups(1); + ccfg.setNearConfiguration(null); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(NODES_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testNoHangs() throws Exception { + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture<Long> restartFut = null; + + try { + restartFut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + while (!stop.get()) { + try { + U.sleep(500); + + startGrid(NODES_CNT); + + awaitPartitionMapExchange(); + + U.sleep(500); + + stopGrid(NODES_CNT); + } + catch (Exception e) { + // No-op. + } + } + } + }, 1, "restart-thread"); + + long stopTime = System.currentTimeMillis() + 2 * 60_000L; + + for (int i = 0; System.currentTimeMillis() < stopTime; i++) { + log.info(">>> Iteration " + i); + + final AtomicInteger threadCnt = new AtomicInteger(); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + int threadNum = threadCnt.getAndIncrement(); + + Ignite ignite = ignite(threadNum % NODES_CNT); + + IgniteCache<Integer, Integer> cache = ignite.cache(CACHE); + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 500, 0)) { + ThreadLocalRandom8 rnd = ThreadLocalRandom8.current(); + + for (int i = 0; i < 50; i++) { + int key = rnd.nextInt(50); + + if (log.isDebugEnabled()) { + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", key=" + key + ']'); + } + + cache.put(key, 0); + } + + tx.commit(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }, NODES_CNT * 3, "tx-thread"); + + fut.get(); + } + } + finally { + stop.set(true); + + if (restartFut != null) + restartFut.get(); + + checkDetectionFuts(); + } + } + + /** + * @throws Exception If failed. + */ + public void testNoDeadlockSimple() throws Exception { + final AtomicInteger threadCnt = new AtomicInteger(); + + final AtomicBoolean deadlock = new AtomicBoolean(); + + final AtomicBoolean timedOut = new AtomicBoolean(); + + final CyclicBarrier barrier = new CyclicBarrier(2); + + final int timeout = 500; + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + int threadNum = threadCnt.getAndIncrement(); + + Ignite ignite = ignite(threadNum); + + IgniteCache<Integer, Integer> cache = ignite.cache(CACHE); + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, 0)) { + int key = 42; + + if (log.isDebugEnabled()) + log.debug(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", key=" + key + ']'); + + cache.put(key, 0); + + barrier.await(timeout + 100, TimeUnit.MILLISECONDS); + + tx.commit(); + } + catch (Exception e) { + if (hasCause(e, TransactionTimeoutException.class)) + timedOut.set(true); + + if (hasCause(e, TransactionDeadlockException.class)) + deadlock.set(true); + } + } + }, 2, "tx-thread"); + + fut.get(); + + assertTrue(timedOut.get()); + + assertFalse(deadlock.get()); + + checkDetectionFuts(); + } + + /** + * @throws Exception If failed. + */ + public void testNoDeadlock() throws Exception { + for (int i = 2; i <= 10; i++) { + final int threads = i; + + log.info(">>> Test with " + threads + " transactions."); + + final AtomicInteger threadCnt = new AtomicInteger(); + + final AtomicBoolean deadlock = new AtomicBoolean(); + + final AtomicBoolean timedOut = new AtomicBoolean(); + + final CyclicBarrier barrier = new CyclicBarrier(threads); + + final int timeout = 500; + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + int threadNum = threadCnt.incrementAndGet(); + + Ignite ignite = ignite(threadNum % NODES_CNT); + + IgniteCache<Integer, Integer> cache = ignite.cache(CACHE); + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, 0)) { + int key1 = threadNum; + + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", key=" + key1 + ']'); + + cache.put(key1, 0); + + barrier.await(); + + if (threadNum == threads) { + log.info(">>> Performs sleep. [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ']'); + + U.sleep(timeout * 2); + } + else { + int key2 = threadNum + 1; + + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", key2=" + key2 + ']'); + + cache.put(key2, 1); + } + + tx.commit(); + } + catch (Exception e) { + if (hasCause(e, TransactionTimeoutException.class)) + timedOut.set(true); + + if (hasCause(e, TransactionDeadlockException.class)) + deadlock.set(true); + } + } + }, threads, "tx-thread"); + + fut.get(); + + assertTrue(timedOut.get()); + + assertFalse(deadlock.get()); + + checkDetectionFuts(); + } + } + + /** + * @throws Exception If failed. + */ + public void testFailedTxLocksRequest() throws Exception { + doTestFailedMessage(TxLocksRequest.class); + } + + /** + * @throws Exception If failed. + */ + public void testFailedTxLocksResponse() throws Exception { + doTestFailedMessage(TxLocksResponse.class); + } + + /** + * @param failCls Failing message class. + * @throws Exception If failed. + */ + private void doTestFailedMessage(Class failCls) throws Exception { + try { + final int txCnt = 2; + + final CyclicBarrier barrier = new CyclicBarrier(2); + + final AtomicInteger threadCnt = new AtomicInteger(); + + final AtomicBoolean deadlock = new AtomicBoolean(); + + final AtomicBoolean timeout = new AtomicBoolean(); + + TestCommunicationSpi.failCls = failCls; + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + int num = threadCnt.getAndIncrement(); + + Ignite ignite = ignite(num); + + IgniteCache<Object, Integer> cache = ignite.cache(CACHE); + + try (Transaction tx = + ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, num == 0 ? 500 : 1500, 0) + ) { + int key1 = primaryKey(ignite((num + 1) % txCnt).cache(CACHE)); + + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", key=" + key1 + ']'); + + cache.put(new TestKey(key1), 1); + + barrier.await(); + + int key2 = primaryKey(cache); + + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", key=" + key2 + ']'); + + cache.put(new TestKey(key2), 2); + + tx.commit(); + } + catch (Exception e) { + timeout.compareAndSet(false, hasCause(e, TransactionTimeoutException.class)); + + deadlock.compareAndSet(false, hasCause(e, TransactionDeadlockException.class)); + } + } + }, 2, "tx-thread"); + + fut.get(); + + assertFalse(deadlock.get()); + + assertTrue(timeout.get()); + + checkDetectionFuts(); + } + finally { + TestCommunicationSpi.failCls = null; + TestKey.failSer = false; + } + } + + /** + * + */ + private void checkDetectionFuts() { + for (int i = 0; i < NODES_CNT ; i++) { + Ignite ignite = ignite(i); + + IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm(); + + ConcurrentMap<Long, TxDeadlockDetection.TxDeadlockFuture> futs = + GridTestUtils.getFieldValue(txMgr, IgniteTxManager.class, "deadlockDetectFuts"); + + assertTrue(futs.isEmpty()); + } + } + + /** + * + */ + private static class TestKey implements Externalizable { + /** Fail request. */ + private static volatile boolean failSer = false; + + /** Id. */ + private int id; + + /** + * Default constructor (required by Externalizable). + */ + public TestKey() { + // No-op. + } + + /** + * @param id Id. + */ + public TestKey(int id) { + this.id = id; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(id); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + if (failSer) { + TestCommunicationSpi.failCls = null; + failSer = false; + + throw new IOException(); + } + + id = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + TestKey key = (TestKey)o; + + return id == key.id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** Fail response. */ + private static volatile Class failCls; + + /** {@inheritDoc} */ + @Override public void sendMessage( + ClusterNode node, + Message msg, + IgniteInClosure<IgniteException> ackC + ) throws IgniteSpiException { + if (failCls != null && msg instanceof GridIoMessage && + ((GridIoMessage)msg).message().getClass() == failCls) + TestKey.failSer = true; + + super.sendMessage(node, msg, ackC); + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionCrossCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionCrossCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionCrossCacheTest.java new file mode 100644 index 0000000..abbefd0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionCrossCacheTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionDeadlockException; +import org.apache.ignite.transactions.TransactionTimeoutException; + +import static org.apache.ignite.internal.util.typedef.X.hasCause; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class TxPessimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstractTest { + /** Nodes count. */ + private static final int NODES_CNT = 2; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (isDebug()) { + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.failureDetectionTimeoutEnabled(false); + + cfg.setDiscoverySpi(discoSpi); + } + + CacheConfiguration ccfg0 = defaultCacheConfiguration(); + + ccfg0.setName("cache0"); + ccfg0.setCacheMode(CacheMode.PARTITIONED); + ccfg0.setBackups(1); + ccfg0.setNearConfiguration(null); + + CacheConfiguration ccfg1 = defaultCacheConfiguration(); + + ccfg1.setName("cache1"); + ccfg1.setCacheMode(CacheMode.PARTITIONED); + ccfg1.setBackups(1); + ccfg1.setNearConfiguration(null); + + cfg.setCacheConfiguration(ccfg0, ccfg1); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(NODES_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testDeadlock() throws Exception { + final CyclicBarrier barrier = new CyclicBarrier(2); + + final AtomicInteger threadCnt = new AtomicInteger(); + + final AtomicBoolean deadlock = new AtomicBoolean(); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + int threadNum = threadCnt.getAndIncrement(); + + Ignite ignite = ignite(0); + + IgniteCache<Integer, Integer> cache1 = ignite.cache("cache" + (threadNum == 0 ? 0 : 1)); + + IgniteCache<Integer, Integer> cache2 = ignite.cache("cache" + (threadNum == 0 ? 1 : 0)); + + try (Transaction tx = + ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 500, 0) + ) { + int key1 = primaryKey(cache1); + + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", key=" + key1 + ", cache=" + cache1.getName() + ']'); + + cache1.put(key1, 0); + + barrier.await(); + + int key2 = primaryKey(cache2); + + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", key=" + key2 + ", cache=" + cache2.getName() + ']'); + + cache2.put(key2, 1); + + tx.commit(); + } + catch (Throwable e) { + // At least one stack trace should contain TransactionDeadlockException. + if (hasCause(e, TransactionTimeoutException.class) && + hasCause(e, TransactionDeadlockException.class) + ) { + if (deadlock.compareAndSet(false, true)) + U.error(log, "At least one stack trace should contain " + + TransactionDeadlockException.class.getSimpleName(), e); + } + } + } + }, 2, "tx-thread"); + + fut.get(); + + assertTrue(deadlock.get()); + + for (int i = 0; i < NODES_CNT ; i++) { + Ignite ignite = ignite(i); + + IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm(); + + ConcurrentMap<Long, TxDeadlockDetection.TxDeadlockFuture> futs = + GridTestUtils.getFieldValue(txMgr, IgniteTxManager.class, "deadlockDetectFuts"); + + assertTrue(futs.isEmpty()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java new file mode 100644 index 0000000..ee1a989 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionDeadlockException; +import org.apache.ignite.transactions.TransactionTimeoutException; + +import static org.apache.ignite.cache.CacheMode.LOCAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; +import static org.apache.ignite.internal.util.typedef.X.cause; +import static org.apache.ignite.internal.util.typedef.X.hasCause; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Tests deadlock detection for pessimistic transactions. + */ +public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest { + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** Nodes count (actually two times more nodes will started: server + client). */ + private static final int NODES_CNT = 4; + + /** No op transformer. */ + private static final NoOpTransformer NO_OP_TRANSFORMER = new NoOpTransformer(); + + /** Wrapping transformer. */ + private static final WrappingTransformer WRAPPING_TRANSFORMER = new WrappingTransformer(); + + /** Client mode flag. */ + private static boolean client; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (isDebug()) { + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.failureDetectionTimeoutEnabled(false); + + cfg.setDiscoverySpi(discoSpi); + } + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + client = false; + + startGrids(NODES_CNT); + + client = true; + + for (int i = 0; i < NODES_CNT; i++) + startGrid(i + NODES_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testDeadlocksPartitioned() throws Exception { + for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) { + doTestDeadlocks(createCache(PARTITIONED, syncMode, false), NO_OP_TRANSFORMER); + doTestDeadlocks(createCache(PARTITIONED, syncMode, false), WRAPPING_TRANSFORMER); + } + } + + /** + * @throws Exception If failed. + */ + public void _testDeadlocksPartitionedNear() throws Exception { + for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) { + doTestDeadlocks(createCache(PARTITIONED, syncMode, true), NO_OP_TRANSFORMER); + doTestDeadlocks(createCache(PARTITIONED, syncMode, true), WRAPPING_TRANSFORMER); + } + } + + /** + * @throws Exception If failed. + */ + public void testDeadlocksReplicated() throws Exception { + for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) { + doTestDeadlocks(createCache(REPLICATED, syncMode, false), NO_OP_TRANSFORMER); + doTestDeadlocks(createCache(REPLICATED, syncMode, false), WRAPPING_TRANSFORMER); + } + } + + /** + * @throws Exception If failed. + */ + public void testDeadlocksLocal() throws Exception { + for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) { + IgniteCache cache = null; + + try { + cache = createCache(LOCAL, syncMode, false); + + awaitPartitionMapExchange(); + + doTestDeadlock(2, true, true, false, NO_OP_TRANSFORMER); + doTestDeadlock(2, true, true, false, WRAPPING_TRANSFORMER); + } + finally { + if (cache != null) + cache.destroy(); + } + } + } + + /** + * @param cacheMode Cache mode. + * @param syncMode Write sync mode. + * @param near Near. + */ + @SuppressWarnings("unchecked") + private IgniteCache createCache(CacheMode cacheMode, CacheWriteSynchronizationMode syncMode, boolean near) { + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setName(CACHE_NAME); + ccfg.setCacheMode(cacheMode); + ccfg.setBackups(1); + ccfg.setNearConfiguration(near ? new NearCacheConfiguration() : null); + ccfg.setWriteSynchronizationMode(syncMode); + + return ignite(0).getOrCreateCache(ccfg); + } + + /** + * @throws Exception If failed. + */ + private void doTestDeadlocks(IgniteCache cache, IgniteClosure<Integer, Object> transformer) throws Exception { + try { + awaitPartitionMapExchange(); + + doTestDeadlock(2, false, true, true, transformer); + doTestDeadlock(2, false, false, false, transformer); + doTestDeadlock(2, false, false, true, transformer); + + doTestDeadlock(3, false, true, true, transformer); + doTestDeadlock(3, false, false, false, transformer); + doTestDeadlock(3, false, false, true, transformer); + + doTestDeadlock(4, false, true, true, transformer); + doTestDeadlock(4, false, false, false, transformer); + doTestDeadlock(4, false, false, true, transformer); + } + catch (Exception e) { + U.error(log, "Unexpected exception: ", e); + + fail(); + } + finally { + if (cache != null) + cache.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + private void doTestDeadlock( + final int txCnt, + final boolean loc, + boolean lockPrimaryFirst, + final boolean clientTx, + final IgniteClosure<Integer, Object> transformer + ) throws Exception { + log.info(">>> Test deadlock [txCnt=" + txCnt + ", loc=" + loc + ", lockPrimaryFirst=" + lockPrimaryFirst + + ", clientTx=" + clientTx + ", transformer=" + transformer.getClass().getName() + ']'); + + final AtomicInteger threadCnt = new AtomicInteger(); + + final CyclicBarrier barrier = new CyclicBarrier(txCnt); + + final AtomicReference<TransactionDeadlockException> deadlockErr = new AtomicReference<>(); + + final List<List<Integer>> keySets = generateKeys(txCnt, loc, !lockPrimaryFirst); + + final Set<Integer> involvedKeys = new GridConcurrentHashSet<>(); + final Set<Integer> involvedLockedKeys = new GridConcurrentHashSet<>(); + final Set<IgniteInternalTx> involvedTxs = new GridConcurrentHashSet<>(); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + int threadNum = threadCnt.incrementAndGet(); + + Ignite ignite = loc ? ignite(0) : ignite(clientTx ? threadNum - 1 + txCnt : threadNum - 1); + + IgniteCache<Object, Integer> cache = ignite.cache(CACHE_NAME); + + List<Integer> keys = keySets.get(threadNum - 1); + + int txTimeout = 500 + txCnt * 100; + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, txTimeout, 0)) { + involvedTxs.add(((TransactionProxyImpl)tx).tx()); + + Integer key = keys.get(0); + + involvedKeys.add(key); + + Object k; + + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", key=" + transformer.apply(key) + ']'); + + cache.put(transformer.apply(key), 0); + + involvedLockedKeys.add(key); + + barrier.await(); + + key = keys.get(1); + + ClusterNode primaryNode = + ((IgniteCacheProxy)cache).context().affinity().primary(key, NONE); + + List<Integer> primaryKeys = + primaryKeys(grid(primaryNode).cache(CACHE_NAME), 5, key + (100 * threadNum)); + + Map<Object, Integer> entries = new HashMap<>(); + + involvedKeys.add(key); + + entries.put(transformer.apply(key), 0); + + for (Integer i : primaryKeys) { + involvedKeys.add(i); + + entries.put(transformer.apply(i), 1); + + k = transformer.apply(i + 13); + + involvedKeys.add(i + 13); + + entries.put(k, 2); + } + + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", entries=" + entries + ']'); + + cache.putAll(entries); + + tx.commit(); + } + catch (Throwable e) { + // At least one stack trace should contain TransactionDeadlockException. + if (hasCause(e, TransactionTimeoutException.class) && + hasCause(e, TransactionDeadlockException.class) + ) { + if (deadlockErr.compareAndSet(null, cause(e, TransactionDeadlockException.class))) + U.error(log, "At least one stack trace should contain " + + TransactionDeadlockException.class.getSimpleName(), e); + } + } + } + }, loc ? 2 : txCnt, "tx-thread"); + + fut.get(); + + U.sleep(1000); + + TransactionDeadlockException deadlockE = deadlockErr.get(); + + assertNotNull(deadlockE); + + // Check transactions, futures and entry locks state. + for (int i = 0; i < NODES_CNT * 2; i++) { + Ignite ignite = ignite(i); + + int cacheId = ((IgniteCacheProxy)ignite.cache(CACHE_NAME)).context().cacheId(); + + IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm(); + + Collection<IgniteInternalTx> activeTxs = txMgr.activeTransactions(); + + for (IgniteInternalTx tx : activeTxs) { + Collection<IgniteTxEntry> entries = tx.allEntries(); + + for (IgniteTxEntry entry : entries) { + if (entry.cacheId() == cacheId) + fail("Transaction still exists: " + tx); + } + } + + ConcurrentMap<Long, TxDeadlockDetection.TxDeadlockFuture> futs = + GridTestUtils.getFieldValue(txMgr, IgniteTxManager.class, "deadlockDetectFuts"); + + assertTrue(futs.isEmpty()); + + GridCacheAdapter<Object, Integer> intCache = internalCache(i, CACHE_NAME); + + GridCacheConcurrentMap map = intCache.map(); + + for (Integer key : involvedKeys) { + Object key0 = transformer.apply(key); + + KeyCacheObject keyCacheObj = intCache.context().toCacheKeyObject(key0); + + GridCacheMapEntry entry = map.getEntry(keyCacheObj); + + if (entry != null) + assertNull("Entry still has locks " + entry, entry.mvccAllLocal()); + } + } + + // Check deadlock report + String msg = deadlockE.getMessage(); + + for (IgniteInternalTx tx : involvedTxs) + assertTrue(msg.contains( + "[txId=" + tx.xidVersion() + ", nodeId=" + tx.nodeId() + ", threadId=" + tx.threadId() + ']')); + + for (Integer key : involvedKeys) { + if (involvedLockedKeys.contains(key)) + assertTrue(msg.contains("[key=" + transformer.apply(key) + ", cache=" + CACHE_NAME + ']')); + else + assertFalse(msg.contains("[key=" + transformer.apply(key))); + } + } + + /** + * @param nodesCnt Nodes count. + * @param loc Local cache. + */ + private List<List<Integer>> generateKeys(int nodesCnt, boolean loc, boolean reverse) throws IgniteCheckedException { + List<List<Integer>> keySets = new ArrayList<>(); + + if (loc) { + List<Integer> keys = primaryKeys(ignite(0).cache(CACHE_NAME), 2); + + keySets.add(new ArrayList<>(keys)); + + Collections.reverse(keys); + + keySets.add(keys); + } + else { + for (int i = 0; i < nodesCnt; i++) { + List<Integer> keys = new ArrayList<>(2); + + keys.add(primaryKey(ignite(i).cache(CACHE_NAME))); + keys.add(primaryKey(ignite(i == nodesCnt - 1 ? 0 : i + 1).cache(CACHE_NAME))); + + if (reverse) + Collections.reverse(keys); + + keySets.add(keys); + } + } + + return keySets; + } + + /** + * + */ + private static class NoOpTransformer implements IgniteClosure<Integer, Object> { + /** {@inheritDoc} */ + @Override public Object apply(Integer val) { + return val; + } + } + + /** + * + */ + private static class WrappingTransformer implements IgniteClosure<Integer, Object> { + /** {@inheritDoc} */ + @Override public Object apply(Integer val) { + return new KeyObject(val); + } + } + + /** + * + */ + private static class KeyObject implements Serializable { + /** Id. */ + private int id; + + /** Name. */ + private String name; + + /** + * @param id Id. + */ + public KeyObject(int id) { + this.id = id; + this.name = "KeyObject" + id; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "KeyObject{" + + "id=" + id + + ", name='" + name + '\'' + + '}'; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + KeyObject obj = (KeyObject)o; + + return id == obj.id && name.equals(obj.name); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/test/java/org/apache/ignite/testsuites/BinaryObjectsTxDeadlockDetectionTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/BinaryObjectsTxDeadlockDetectionTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/BinaryObjectsTxDeadlockDetectionTestSuite.java new file mode 100644 index 0000000..45afba2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/BinaryObjectsTxDeadlockDetectionTestSuite.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.testframework.config.GridTestProperties; + +/** + * + */ +public class BinaryObjectsTxDeadlockDetectionTestSuite { + /** + * @return Test suite. + * @throws Exception If failed. + */ + public static TestSuite suite() throws Exception { + GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName()); + + return TxDeadlockDetectionTestSuite.suite(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/test/java/org/apache/ignite/testsuites/TxDeadlockDetectionTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/TxDeadlockDetectionTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/TxDeadlockDetectionTestSuite.java new file mode 100644 index 0000000..c057e55 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/TxDeadlockDetectionTestSuite.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.transactions.DepthFirstSearchTest; +import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetectionTest; +import org.apache.ignite.internal.processors.cache.transactions.TxPessimisticDeadlockDetectionCrossCacheTest; +import org.apache.ignite.internal.processors.cache.transactions.TxPessimisticDeadlockDetectionTest; + +/** + * Deadlock detection related tests. + */ +public class TxDeadlockDetectionTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception If failed. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Ignite Deadlock Detection Test Suite"); + + suite.addTestSuite(DepthFirstSearchTest.class); + suite.addTestSuite(TxPessimisticDeadlockDetectionTest.class); + suite.addTestSuite(TxPessimisticDeadlockDetectionCrossCacheTest.class); + suite.addTestSuite(TxDeadlockDetectionTest.class); + + return suite; + } +}
