http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 63c9919..f9357f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -50,7 +50,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture; @@ -62,6 +61,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemo import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -126,7 +127,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19"); /** Deadlock detection maximum iterations. */ - static final int DEADLOCK_MAX_ITERS = + static int DEADLOCK_MAX_ITERS = IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000); /** Committing transactions. */ @@ -389,7 +390,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * {@code false} otherwise. */ public boolean isCompleted(IgniteInternalTx tx) { - return completedVersHashMap.containsKey(tx.xidVersion()); + boolean completed = completedVersHashMap.containsKey(tx.xidVersion()); + + // Need check that for tx with timeout rollback message was not received before lock. + if (!completed && tx.local() && tx.dht() && tx.timeout() > 0) + return completedVersHashMap.containsKey(tx.nearXidVersion()); + + return completed; } /** @@ -495,13 +502,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return null; } - if (tx.timeout() > 0) { - cctx.time().addTimeoutObject(tx); - - if (log.isDebugEnabled()) - log.debug("Registered transaction with timeout processor: " + tx); - } - if (log.isDebugEnabled()) log.debug("Transaction created: " + tx); @@ -786,7 +786,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { */ public void prepareTx(IgniteInternalTx tx) throws IgniteCheckedException { if (tx.state() == MARKED_ROLLBACK) { - if (tx.timedOut()) + if (tx.remainingTime() == -1) throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this); throw new IgniteCheckedException("Transaction is marked for rollback: " + tx); @@ -1081,13 +1081,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (log.isDebugEnabled()) log.debug("Committing from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']'); - if (tx.timeout() > 0) { - cctx.time().removeTimeoutObject(tx); - - if (log.isDebugEnabled()) - log.debug("Unregistered transaction with timeout processor: " + tx); - } - /* * Note that write phase is handled by transaction adapter itself, * so we don't do it here. @@ -2006,17 +1999,45 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (!(nearTxLoc || tx instanceof GridDhtTxLocal) || !hasKeys(tx, txKeys)) continue; - Collection<IgniteTxEntry> txEntries = tx.allEntries(); + IgniteTxState state = tx.txState(); + + assert state instanceof IgniteTxStateImpl || state instanceof IgniteTxImplicitSingleStateImpl; + + Collection<IgniteTxEntry> txEntries = + state instanceof IgniteTxStateImpl ? ((IgniteTxStateImpl)state).allEntriesCopy() : state.allEntries(); - Set<KeyCacheObject> requestedKeys = null; + Set<IgniteTxKey> requestedKeys = null; // Try to get info about requested keys for detached entries in case of GridNearTxLocal transaction // in order to reduce amount of requests to remote nodes. if (nearTxLoc) { - GridDhtColocatedLockFuture fut = colocatedLockFuture(tx); + if (tx.pessimistic()) { + GridDhtColocatedLockFuture fut = + (GridDhtColocatedLockFuture)mvccFuture(tx, GridDhtColocatedLockFuture.class); + + if (fut != null) + requestedKeys = fut.requestedKeys(); + + GridNearLockFuture nearFut = (GridNearLockFuture)mvccFuture(tx, GridNearLockFuture.class); - if (fut != null) - requestedKeys = fut.requestedKeys(); + if (nearFut != null) { + Set<IgniteTxKey> nearRequestedKeys = nearFut.requestedKeys(); + + if (nearRequestedKeys != null) { + if (requestedKeys == null) + requestedKeys = nearRequestedKeys; + else + requestedKeys = nearRequestedKeys; + } + } + } + else { + GridNearOptimisticTxPrepareFuture fut = + (GridNearOptimisticTxPrepareFuture)mvccFuture(tx, GridNearOptimisticTxPrepareFuture.class); + + if (fut != null) + requestedKeys = fut.requestedKeys(); + } } for (IgniteTxEntry txEntry : txEntries) { @@ -2073,17 +2094,18 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** * @param tx Tx. Must be instance of {@link GridNearTxLocal}. - * @return Colocated future. + * @param cls Future class. + * @return Cache future. */ - private GridDhtColocatedLockFuture colocatedLockFuture(IgniteInternalTx tx) { + private IgniteInternalFuture mvccFuture(IgniteInternalTx tx, Class<? extends IgniteInternalFuture> cls) { assert tx instanceof GridNearTxLocal : tx; Collection<GridCacheMvccFuture<?>> futs = cctx.mvcc().mvccFutures(tx.nearXidVersion()); if (futs != null) { for (GridCacheMvccFuture<?> fut : futs) { - if (fut instanceof GridDhtColocatedLockFuture) - return (GridDhtColocatedLockFuture)fut; + if (fut.getClass().equals(cls)) + return fut; } } @@ -2115,6 +2137,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * @return Collection of active transaction deadlock detection futures. + */ + @SuppressWarnings("unchecked") + public Collection<IgniteInternalFuture<?>> deadlockDetectionFutures() { + Collection<? extends IgniteInternalFuture<?>> values = deadlockDetectFuts.values(); + + return (Collection<IgniteInternalFuture<?>>)values; + } + + /** * Timeout object for node failure handler. */ private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java index c116d0d..f23cca9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.ignite.IgniteCheckedException; @@ -39,6 +40,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentLinkedHashMap; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -362,6 +364,13 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { return txMap == null ? Collections.<IgniteTxEntry>emptySet() : txMap.values(); } + /** + * @return All entries. Returned collection is copy of internal collection. + */ + public synchronized Collection<IgniteTxEntry> allEntriesCopy() { + return txMap == null ? Collections.<IgniteTxEntry>emptySet() : new HashSet<>(txMap.values()); + } + /** {@inheritDoc} */ @Override public IgniteTxEntry entry(IgniteTxKey key) { return txMap == null ? null : txMap.get(key); @@ -408,7 +417,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { } /** {@inheritDoc} */ - @Override public void addEntry(IgniteTxEntry entry) { + @Override public synchronized void addEntry(IgniteTxEntry entry) { txMap.put(entry.txKey(), entry); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java index 36843dd..70d938e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java @@ -36,8 +36,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT; @@ -106,22 +109,30 @@ public class TxDeadlockDetection { stack.push(txId); while (!stack.isEmpty()) { - GridCacheVersion v = stack.pop(); + GridCacheVersion v = stack.peek(); + + if (visited.contains(v)) { + stack.pop(); + inPath.remove(v); - if (visited.contains(v)) continue; + } visited.add(v); Set<GridCacheVersion> children = wfg.get(v); - if (children == null || children.isEmpty()) + if (children == null || children.isEmpty()) { + stack.pop(); + inPath.remove(v); + continue; + } inPath.add(v); for (GridCacheVersion w : children) { - if (inPath.contains(w)) { + if (inPath.contains(w) && visited.contains(w)) { List<GridCacheVersion> cycle = new ArrayList<>(); for (GridCacheVersion x = v; !x.equals(w); x = edgeTo.get(x)) @@ -158,15 +169,18 @@ public class TxDeadlockDetection { private final Set<IgniteTxKey> keys; /** Processed keys. */ + @GridToStringInclude private final Set<IgniteTxKey> processedKeys = new HashSet<>(); /** Processed nodes. */ private final Set<UUID> processedNodes = new HashSet<>(); /** Pending keys. */ + @GridToStringInclude private Map<UUID, Set<IgniteTxKey>> pendingKeys = new HashMap<>(); /** Nodes queue. */ + @GridToStringInclude private final UniqueDeque<UUID> nodesQueue = new UniqueDeque<>(); /** Preferred nodes. */ @@ -194,6 +208,7 @@ public class TxDeadlockDetection { private int itersCnt; /** Timeout object. */ + @GridToStringExclude private DeadlockTimeoutObject timeoutObj; /** Timed out flag. */ @@ -252,8 +267,8 @@ public class TxDeadlockDetection { if (topVer == null) // Tx manager already stopped onDone(); - - map(keys, Collections.<IgniteTxKey, TxLockList>emptyMap()); + else + map(keys, Collections.<IgniteTxKey, TxLockList>emptyMap()); } /** @@ -441,14 +456,17 @@ public class TxDeadlockDetection { * @param txLocks Tx locks. */ private void updateWaitForGraph(Map<IgniteTxKey, TxLockList> txLocks) { + if (txLocks == null || txLocks.isEmpty()) + return; + for (Map.Entry<IgniteTxKey, TxLockList> e : txLocks.entrySet()) { GridCacheVersion txOwner = null; for (TxLock lock : e.getValue().txLocks()) { - if (lock.owner()) { - assert txOwner == null; - + if (lock.owner() && txOwner == null) { + // Actually we can get lock list with more than one owner. In this case ignore all owners + // except first because likely the first owner was cause of deadlock. txOwner = lock.txId(); if (keys.contains(e.getKey()) && !txId.equals(lock.txId())) { @@ -463,7 +481,7 @@ public class TxDeadlockDetection { continue; } - if (lock.candiate()) { + if (lock.candiate() || lock.owner()) { GridCacheVersion txId0 = lock.txId(); Set<GridCacheVersion> waitForTxs = wfg.get(txId0); @@ -485,9 +503,9 @@ public class TxDeadlockDetection { if (res != null && set) { if (res.classError() != null) { - IgniteLogger log = cctx.logger(TxDeadlockDetection.class); + IgniteLogger log = cctx.kernalContext().log(this.getClass()); - log.warning("Failed to finish deadlock detection due to an error: " + nodeId); + U.warn(log, "Failed to finish deadlock detection due to an error: " + nodeId); onDone(); } @@ -528,6 +546,11 @@ public class TxDeadlockDetection { return false; } + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TxDeadlockFuture.class, this); + } + /** * Lock request timeout object. */ @@ -543,6 +566,10 @@ public class TxDeadlockDetection { @Override public void onTimeout() { timedOut = true; + IgniteLogger log = cctx.kernalContext().log(this.getClass()); + + U.warn(log, "Deadlock detection was timed out [timeout=" + DEADLOCK_TIMEOUT + ", fut=" + this + ']'); + onDone(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java index 94b5620..2b524e8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java @@ -18,15 +18,29 @@ package org.apache.ignite.internal.processors.cache; +import javax.cache.CacheException; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -34,11 +48,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionTimeoutException; -import javax.cache.CacheException; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.MutableEntry; - import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; @@ -62,6 +71,10 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + TcpCommunicationSpi commSpi = new TestCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setAtomicityMode(atomicityMode()); @@ -87,7 +100,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { - startGrids(1); + startGrids(2); } /** {@inheritDoc} */ @@ -98,7 +111,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { /** * Success if user tx was timed out. * - * @throws Exception + * @throws Exception If failed. */ public void testUserTxTimeout() throws Exception { final Ignite ignite = grid(0); @@ -112,7 +125,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { /** * Success if system caches weren't timed out. * - * @throws Exception + * @throws Exception If failed. */ public void testSystemCacheTx() throws Exception { final Ignite ignite = grid(0); @@ -143,27 +156,23 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { * Success if implicit tx fails. * * @param cache Cache name. - * @throws Exception + * @throws Exception If failed. */ protected void checkImplicitTxTimeout(final IgniteCache<Object, Object> cache) throws Exception { - try { - cache.invoke("key", new EntryProcessor<Object, Object, Object>() { - @Override public Object process(final MutableEntry<Object, Object> entry, final Object... args) - throws EntryProcessorException { - try { - sleepForTxFailure(); - } catch (InterruptedException e) { - throw new EntryProcessorException(e); - } + TestCommunicationSpi.delay = true; - return null; - } - }); + Integer key = primaryKey(ignite(1).cache(CACHE_NAME)); + + try { + cache.put(key, 0); fail("Timeout exception must be thrown"); } catch (CacheException e) { - // OK + // No-op. + } + finally { + TestCommunicationSpi.delay = false; } cache.clear(); @@ -174,7 +183,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { * * @param cache Cache name. * @param ignite Ignite instance. - * @throws Exception + * @throws Exception If failed. */ protected void checkExplicitTxTimeout(final IgniteCache<Object, Object> cache, final Ignite ignite) throws Exception { @@ -198,7 +207,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { * Success if explicit tx doesn't fail. * * @param cache Cache instance. - * @throws Exception + * @throws Exception If failed. */ protected void checkStartTxSuccess(final IgniteInternalCache<Object, Object> cache) throws Exception { try (final IgniteInternalTx tx = CU.txStartInternal(cache.context(), cache, PESSIMISTIC, READ_COMMITTED)) { @@ -220,7 +229,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { * Success if implicit tx fails. * * @param cache Cache instance. - * @throws Exception + * @throws Exception If failed. */ protected void checkImplicitTxSuccess(final IgniteInternalCache<Object, Object> cache) throws Exception { cache.invoke("key", new EntryProcessor<Object, Object, Object>() { @@ -241,9 +250,39 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { /** * Sleep multiple {@link #TX_TIMEOUT} times. * - * @throws InterruptedException + * @throws InterruptedException If interrupted. */ private void sleepForTxFailure() throws InterruptedException { Thread.sleep(TX_TIMEOUT * 3); } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** Delay. */ + private static volatile boolean delay; + + /** {@inheritDoc} */ + @Override public void sendMessage( + final ClusterNode node, + final Message msg, + final IgniteInClosure<IgniteException> ackC + ) throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + Message msg0 = ((GridIoMessage)msg).message(); + + if (msg0 instanceof GridNearTxPrepareRequest && delay) { + try { + U.sleep(TX_TIMEOUT * 2); + } + catch (IgniteInterruptedCheckedException e) { + e.printStackTrace(); + } + } + } + + super.sendMessage(node, msg, ackC); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java index c417821..8475175 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java @@ -20,10 +20,10 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.util.ArrayList; import java.util.List; import java.util.Random; -import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; @@ -154,8 +154,8 @@ public class IgniteTxTimeoutAbstractTest extends GridCommonAbstractTest { assert false : "Timeout never happened for transaction: " + tx; } - catch (CacheException e) { - if (!(e.getCause() instanceof TransactionTimeoutException)) + catch (Exception e) { + if (!(X.hasCause(e, TransactionTimeoutException.class))) throw e; info("Received expected timeout exception [msg=" + e.getMessage() + ", tx=" + tx + ']'); @@ -164,4 +164,4 @@ public class IgniteTxTimeoutAbstractTest extends GridCommonAbstractTest { tx.close(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeoutFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeoutFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeoutFullApiTest.java new file mode 100644 index 0000000..89fe8e0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeoutFullApiTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * + */ +public class CachePartitionedMultiNodeLongTxTimeoutFullApiTest extends GridCachePartitionedMultiNodeFullApiSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.getTransactionConfiguration().setDefaultTxTimeout(Long.MAX_VALUE); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java new file mode 100644 index 0000000..3e3b84e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; + +/** + * + */ +public class CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest extends + GridCachePartitionedMultiNodeFullApiSelfTest { + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return new NearCacheConfiguration(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.getTransactionConfiguration().setDefaultTxTimeout(Long.MAX_VALUE); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java index cfa93ac..e27207d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.local; -import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -154,7 +153,7 @@ public class GridCacheLocalTxTimeoutSelfTest extends GridCommonAbstractTest { tx.commit(); } - catch (CacheException e) { + catch (Exception e) { assertTrue(X.hasCause(e, TransactionTimeoutException.class)); info("Received expected optimistic exception: " + e.getMessage()); @@ -166,4 +165,4 @@ public class GridCacheLocalTxTimeoutSelfTest extends GridCommonAbstractTest { assert wasEx; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java index 20467c2..b0a407c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java @@ -24,10 +24,12 @@ import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import junit.framework.TestCase; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.findCycle; @@ -95,6 +97,14 @@ public class DepthFirstSearchTest extends TestCase { wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ put(T1, new HashSet<GridCacheVersion>(){{add(T2);}}); + put(T2, new HashSet<GridCacheVersion>(){{add(T3);}}); + put(T4, new HashSet<GridCacheVersion>(){{add(T1); add(T2); add(T3);}}); + }}; + + assertAllNull(wfg); + + wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, new HashSet<GridCacheVersion>(){{add(T2);}}); put(T3, new HashSet<GridCacheVersion>(){{add(T4);}}); put(T4, new HashSet<GridCacheVersion>(){{add(T1);}}); }}; @@ -228,6 +238,94 @@ public class DepthFirstSearchTest extends TestCase { } /** + * @throws Exception If failed. + */ + public void testFindCycle4() throws Exception { + Map<GridCacheVersion, Set<GridCacheVersion>> wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, Collections.singleton(T2)); + put(T2, asLinkedHashSet(T3, T4)); + put(T3, Collections.singleton(T4)); + put(T4, Collections.singleton(T5)); + put(T6, Collections.singleton(T3)); + }}; + + assertNull(findCycle(wfg, T1)); + } + + /** + * @throws Exception If failed. + */ + public void testRandomNoExceptions() throws Exception { + int maxNodesCnt = 100; + int minNodesCnt = 10; + int maxWaitForNodesCnt = 20; + + int cyclesFound = 0; + int cyclesNotFound = 0; + + Random seedRnd = new Random(); + + Random rnd = new Random(); + + for (int i = 0; i < 50000; i++) { + long seed = seedRnd.nextLong(); + + rnd.setSeed(seed); + + System.out.println(">>> Iteration " + i + " with seed " + seed); + + int nodesCnt = rnd.nextInt(maxNodesCnt - minNodesCnt) + minNodesCnt; + + Map<GridCacheVersion, Set<GridCacheVersion>> wfg = new HashMap<>(); + + for (int j = 0; j < nodesCnt; j++) { + if (rnd.nextInt(100) > 30) { + int waitForNodesCnt = rnd.nextInt(maxWaitForNodesCnt); + + Set<GridCacheVersion> waitForNodes = null; + + if (waitForNodesCnt > 0) { + waitForNodes = new LinkedHashSet<>(); + + for (int k = 0; k < waitForNodesCnt;) { + int n = rnd.nextInt(nodesCnt); + + if (n != j) { + waitForNodes.add(new GridCacheVersion(n, 0, 0, 0)); + k++; + } + } + } + + wfg.put(new GridCacheVersion(j, 0, 0, 0), waitForNodes); + } + } + + for (int j = 0; j < nodesCnt; j++) { + try { + List<GridCacheVersion> cycle = findCycle(wfg, new GridCacheVersion(j, 0, 0, 0)); + + if (cycle == null) + cyclesNotFound++; + else + cyclesFound++; + } + catch (Throwable e) { + U.error(null, "Error during finding cycle in graph: ", e); + + U.warn(null, "Seed: " + seed); + + U.warn(null, "Wait-for-graph: " + wfg); + + fail(); + } + } + } + + System.out.println(">>> Test finished. Cycles found: " + cyclesFound + ", cycles not found: " + cyclesNotFound); + } + + /** * @param wfg Wait-for-graph. */ private static void assertAllNull(Map<GridCacheVersion, Set<GridCacheVersion>> wfg, GridCacheVersion... ignore) { @@ -249,4 +347,4 @@ public class DepthFirstSearchTest extends TestCase { return set; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java new file mode 100644 index 0000000..c9d18eb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Collection; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT; +import static org.apache.ignite.IgniteSystemProperties.getInteger; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest { + /** Nodes count. */ + private static final int NODES_CNT = 3; + + /** Cache. */ + private static final String CACHE = "cache"; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setName(CACHE); + ccfg.setCacheMode(CacheMode.PARTITIONED); + ccfg.setBackups(1); + ccfg.setNearConfiguration(null); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGridsMultiThreaded(NODES_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + GridTestUtils.setFieldValue(null, TxDeadlockDetection.class, "DEADLOCK_TIMEOUT", (int)(getTestTimeout() * 2)); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + GridTestUtils.setFieldValue(null, TxDeadlockDetection.class, "DEADLOCK_TIMEOUT", + getInteger(IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT, 60000)); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60 * 1000; + } + + /** + * @throws Exception If failed. + */ + public void testNoHangsPessimistic() throws Exception { + assertTrue(grid(0).context().cache().context().tm().deadlockDetectionEnabled()); + + doTest(PESSIMISTIC); + + try { + GridTestUtils.setFieldValue(null, IgniteTxManager.class, "DEADLOCK_MAX_ITERS", 0); + + assertFalse(grid(0).context().cache().context().tm().deadlockDetectionEnabled()); + + doTest(PESSIMISTIC); + } + finally { + GridTestUtils.setFieldValue(null, IgniteTxManager.class, "DEADLOCK_MAX_ITERS", + IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000)); + } + } + + /** + * @throws Exception If failed. + */ + public void testNoHangsOptimistic() throws Exception { + assertTrue(grid(0).context().cache().context().tm().deadlockDetectionEnabled()); + + doTest(OPTIMISTIC); + + try { + GridTestUtils.setFieldValue(null, IgniteTxManager.class, "DEADLOCK_MAX_ITERS", 0); + + assertFalse(grid(0).context().cache().context().tm().deadlockDetectionEnabled()); + + doTest(OPTIMISTIC); + } + finally { + GridTestUtils.setFieldValue(null, IgniteTxManager.class, "DEADLOCK_MAX_ITERS", + IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000)); + } + } + + /** + * @param concurrency Concurrency. + * @throws IgniteCheckedException If failed. + */ + private void doTest(final TransactionConcurrency concurrency) throws IgniteCheckedException { + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture<Long> restartFut = null; + + try { + restartFut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + while (!stop.get()) { + try { + U.sleep(500); + + startGrid(NODES_CNT); + + awaitPartitionMapExchange(); + + U.sleep(500); + + stopGrid(NODES_CNT); + } + catch (Exception e) { + // No-op. + } + } + } + }, 1, "restart-thread"); + + long stopTime = System.currentTimeMillis() + 2 * 60_000L; + + for (int i = 0; System.currentTimeMillis() < stopTime; i++) { + boolean detectionEnabled = grid(0).context().cache().context().tm().deadlockDetectionEnabled(); + + log.info(">>> Iteration " + i + " (detection is " + (detectionEnabled ? "enabled" : "disabled") + ')'); + + final AtomicInteger threadCnt = new AtomicInteger(); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + int threadNum = threadCnt.getAndIncrement(); + + Ignite ignite = ignite(threadNum % NODES_CNT); + + IgniteCache<Integer, Integer> cache = ignite.cache(CACHE); + + try (Transaction tx = ignite.transactions().txStart(concurrency, REPEATABLE_READ, 500, 0)) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 50; i++) { + int key = rnd.nextInt(50); + + if (log.isDebugEnabled()) { + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", key=" + key + ']'); + } + + cache.put(key, 0); + } + + tx.commit(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }, NODES_CNT * 3, "tx-thread"); + + fut.get(); + } + } + finally { + stop.set(true); + + if (restartFut != null) + restartFut.get(); + + checkDetectionFutures(); + } + } + + /** + * + */ + private void checkDetectionFutures() { + for (int i = 0; i < NODES_CNT ; i++) { + Ignite ignite = ignite(i); + + IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm(); + + Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures(); + + assertTrue(futs.isEmpty()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java index 3d0beac..87bc70f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java @@ -21,8 +21,9 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.concurrent.ConcurrentMap; +import java.util.Collection; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -47,7 +48,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionTimeoutException; -import org.jsr166.ThreadLocalRandom8; import static org.apache.ignite.internal.util.typedef.X.hasCause; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; @@ -152,7 +152,7 @@ public class TxDeadlockDetectionTest extends GridCommonAbstractTest { IgniteCache<Integer, Integer> cache = ignite.cache(CACHE); try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 700, 0)) { - ThreadLocalRandom8 rnd = ThreadLocalRandom8.current(); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); for (int i = 0; i < 50; i++) { int key = rnd.nextInt(50); @@ -217,7 +217,7 @@ public class TxDeadlockDetectionTest extends GridCommonAbstractTest { cache.put(key, 0); - barrier.await(timeout + 100, TimeUnit.MILLISECONDS); + barrier.await(timeout + 1000, TimeUnit.MILLISECONDS); tx.commit(); } @@ -281,7 +281,7 @@ public class TxDeadlockDetectionTest extends GridCommonAbstractTest { log.info(">>> Performs sleep. [node=" + ((IgniteKernal)ignite).localNode() + ", tx=" + tx + ']'); - U.sleep(timeout * 2); + U.sleep(timeout * 3); } else { int key2 = threadNum + 1; @@ -406,8 +406,7 @@ public class TxDeadlockDetectionTest extends GridCommonAbstractTest { IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm(); - ConcurrentMap<Long, TxDeadlockDetection.TxDeadlockFuture> futs = - GridTestUtils.getFieldValue(txMgr, IgniteTxManager.class, "deadlockDetectFuts"); + Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures(); assertTrue(futs.isEmpty()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java new file mode 100644 index 0000000..7b40da2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionDeadlockException; +import org.apache.ignite.transactions.TransactionTimeoutException; + +import static org.apache.ignite.internal.util.typedef.X.hasCause; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstractTest { + /** Nodes count. */ + private static final int NODES_CNT = 2; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (isDebug()) { + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.failureDetectionTimeoutEnabled(false); + + cfg.setDiscoverySpi(discoSpi); + } + + TcpCommunicationSpi commSpi = new TestCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + CacheConfiguration ccfg0 = defaultCacheConfiguration(); + + ccfg0.setName("cache0"); + ccfg0.setCacheMode(CacheMode.PARTITIONED); + ccfg0.setBackups(1); + ccfg0.setNearConfiguration(null); + + CacheConfiguration ccfg1 = defaultCacheConfiguration(); + + ccfg1.setName("cache1"); + ccfg1.setCacheMode(CacheMode.PARTITIONED); + ccfg1.setBackups(1); + ccfg1.setNearConfiguration(null); + + cfg.setCacheConfiguration(ccfg0, ccfg1); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(NODES_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testDeadlock() throws Exception { + // Sometimes boh transactions perform commit, so we repeat attempt. + while (!doTestDeadlock()) {} + } + + /** + * @throws Exception If failed. + */ + private boolean doTestDeadlock() throws Exception { + TestCommunicationSpi.init(2); + + final CyclicBarrier barrier = new CyclicBarrier(2); + + final AtomicInteger threadCnt = new AtomicInteger(); + + final AtomicBoolean deadlock = new AtomicBoolean(); + + final AtomicInteger commitCnt = new AtomicInteger(); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + int threadNum = threadCnt.getAndIncrement(); + + Ignite ignite = ignite(0); + + IgniteCache<Integer, Integer> cache1 = ignite.cache("cache" + (threadNum == 0 ? 0 : 1)); + + IgniteCache<Integer, Integer> cache2 = ignite.cache("cache" + (threadNum == 0 ? 1 : 0)); + + try (Transaction tx = + ignite.transactions().txStart(OPTIMISTIC, REPEATABLE_READ, 500, 0) + ) { + int key1 = primaryKey(cache1); + + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", key=" + key1 + ", cache=" + cache1.getName() + ']'); + + cache1.put(key1, 0); + + barrier.await(); + + int key2 = primaryKey(cache2); + + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", key=" + key2 + ", cache=" + cache2.getName() + ']'); + + cache2.put(key2, 1); + + tx.commit(); + + commitCnt.incrementAndGet(); + } + catch (Throwable e) { + // At least one stack trace should contain TransactionDeadlockException. + if (hasCause(e, TransactionTimeoutException.class) && + hasCause(e, TransactionDeadlockException.class) + ) { + if (deadlock.compareAndSet(false, true)) + U.error(log, "At least one stack trace should contain " + + TransactionDeadlockException.class.getSimpleName(), e); + } + } + } + }, 2, "tx-thread"); + + fut.get(); + + if (commitCnt.get() == 2) + return false; + + assertTrue(deadlock.get()); + + for (int i = 0; i < NODES_CNT ; i++) { + Ignite ignite = ignite(i); + + IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm(); + + Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures(); + + assertTrue(futs.isEmpty()); + } + + return true; + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** Tx count. */ + private static volatile int TX_CNT; + + /** Tx ids. */ + private static final Set<GridCacheVersion> TX_IDS = new GridConcurrentHashSet<>(); + + /** + * @param txCnt Tx count. + */ + private static void init(int txCnt) { + TX_CNT = txCnt; + TX_IDS.clear(); + } + + /** {@inheritDoc} */ + @Override public void sendMessage( + final ClusterNode node, + final Message msg, + final IgniteInClosure<IgniteException> ackC + ) throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + Message msg0 = ((GridIoMessage)msg).message(); + + if (msg0 instanceof GridNearTxPrepareRequest) { + final GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg0; + + GridCacheVersion txId = req.version(); + + if (TX_IDS.contains(txId)) { + while (TX_IDS.size() < TX_CNT) { + try { + U.sleep(50); + } + catch (IgniteInterruptedCheckedException e) { + e.printStackTrace(); + } + } + } + } + else if (msg0 instanceof GridNearTxPrepareResponse) { + GridNearTxPrepareResponse res = (GridNearTxPrepareResponse)msg0; + + GridCacheVersion txId = res.version(); + + TX_IDS.add(txId); + } + } + + super.sendMessage(node, msg, ackC); + } + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java new file mode 100644 index 0000000..aa240aa --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionDeadlockException; +import org.apache.ignite.transactions.TransactionTimeoutException; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; +import static org.apache.ignite.internal.util.typedef.X.cause; +import static org.apache.ignite.internal.util.typedef.X.hasCause; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest { + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** Nodes count (actually two times more nodes will started: server + client). */ + private static final int NODES_CNT = 4; + + /** No op transformer. */ + private static final NoOpTransformer NO_OP_TRANSFORMER = new NoOpTransformer(); + + /** Wrapping transformer. */ + private static final WrappingTransformer WRAPPING_TRANSFORMER = new WrappingTransformer(); + + /** Client mode flag. */ + private static boolean client; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (isDebug()) { + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.failureDetectionTimeoutEnabled(false); + + cfg.setDiscoverySpi(discoSpi); + } + + TcpCommunicationSpi commSpi = new TestCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + client = false; + + startGrids(NODES_CNT); + + client = true; + + for (int i = 0; i < NODES_CNT; i++) + startGrid(i + NODES_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testDeadlocksPartitioned() throws Exception { + for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) { + doTestDeadlocks(createCache(PARTITIONED, syncMode, false), NO_OP_TRANSFORMER); + doTestDeadlocks(createCache(PARTITIONED, syncMode, false), WRAPPING_TRANSFORMER); + } + } + + /** + * @throws Exception If failed. + */ + public void testDeadlocksPartitionedNear() throws Exception { + for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) { + doTestDeadlocks(createCache(PARTITIONED, syncMode, true), NO_OP_TRANSFORMER); + doTestDeadlocks(createCache(PARTITIONED, syncMode, true), WRAPPING_TRANSFORMER); + } + } + + /** + * @throws Exception If failed. + */ + public void testDeadlocksReplicated() throws Exception { + for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) { + doTestDeadlocks(createCache(REPLICATED, syncMode, false), NO_OP_TRANSFORMER); + doTestDeadlocks(createCache(REPLICATED, syncMode, false), WRAPPING_TRANSFORMER); + } + } + + /** + * @param cacheMode Cache mode. + * @param syncMode Write sync mode. + * @param near Near. + * @return Created cache. + */ + @SuppressWarnings("unchecked") + private IgniteCache createCache(CacheMode cacheMode, CacheWriteSynchronizationMode syncMode, boolean near) { + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setName(CACHE_NAME); + ccfg.setCacheMode(cacheMode); + ccfg.setBackups(1); + ccfg.setNearConfiguration(near ? new NearCacheConfiguration() : null); + ccfg.setWriteSynchronizationMode(syncMode); + + IgniteCache cache = ignite(0).createCache(ccfg); + + if (near) { + for (int i = 0; i < NODES_CNT; i++) { + Ignite client = ignite(i + NODES_CNT); + + assertTrue(client.configuration().isClientMode()); + + client.createNearCache(ccfg.getName(), new NearCacheConfiguration<>()); + } + } + + return cache; + } + + /** + * @param cache Cache. + * @param transformer Transformer closure. + * @throws Exception If failed. + */ + private void doTestDeadlocks(IgniteCache cache, IgniteClosure<Integer, Object> transformer) throws Exception { + try { + awaitPartitionMapExchange(); + + doTestDeadlock(3, false, true, true, transformer); + doTestDeadlock(3, false, false, false, transformer); + doTestDeadlock(3, false, false, true, transformer); + + doTestDeadlock(4, false, true, true, transformer); + doTestDeadlock(4, false, false, false, transformer); + doTestDeadlock(4, false, false, true, transformer); + } + catch (Throwable e) { + U.error(log, "Unexpected exception: ", e); + + fail(); + } + finally { + if (cache != null) + cache.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + private void doTestDeadlock( + final int txCnt, + final boolean loc, + boolean lockPrimaryFirst, + final boolean clientTx, + final IgniteClosure<Integer, Object> transformer + ) throws Exception { + log.info(">>> Test deadlock [txCnt=" + txCnt + ", loc=" + loc + ", lockPrimaryFirst=" + lockPrimaryFirst + + ", clientTx=" + clientTx + ", transformer=" + transformer.getClass().getName() + ']'); + + TestCommunicationSpi.init(txCnt); + + final AtomicInteger threadCnt = new AtomicInteger(); + + final CyclicBarrier barrier = new CyclicBarrier(txCnt); + + final AtomicReference<TransactionDeadlockException> deadlockErr = new AtomicReference<>(); + + final List<List<Integer>> keySets = generateKeys(txCnt, loc, !lockPrimaryFirst); + + final Set<Integer> involvedKeys = new GridConcurrentHashSet<>(); + final Set<Integer> involvedLockedKeys = new GridConcurrentHashSet<>(); + final Set<IgniteInternalTx> involvedTxs = new GridConcurrentHashSet<>(); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + int threadNum = threadCnt.incrementAndGet(); + + Ignite ignite = loc ? ignite(0) : ignite(clientTx ? threadNum - 1 + txCnt : threadNum - 1); + + IgniteCache<Object, Integer> cache = ignite.cache(CACHE_NAME); + + List<Integer> keys = keySets.get(threadNum - 1); + + int txTimeout = 500 + txCnt * 100; + + try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, REPEATABLE_READ, txTimeout, 0)) { + IgniteInternalTx tx0 = ((TransactionProxyImpl)tx).tx(); + + involvedTxs.add(tx0); + + Integer key = keys.get(0); + + involvedKeys.add(key); + + Object k; + + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", key=" + transformer.apply(key) + ']'); + + cache.put(transformer.apply(key), 0); + + involvedLockedKeys.add(key); + + barrier.await(); + + key = keys.get(1); + + ClusterNode primaryNode = + ((IgniteCacheProxy)cache).context().affinity().primary(key, NONE); + + List<Integer> primaryKeys = + primaryKeys(grid(primaryNode).cache(CACHE_NAME), 5, key + (100 * threadNum)); + + Map<Object, Integer> entries = new HashMap<>(); + + involvedKeys.add(key); + + entries.put(transformer.apply(key), 0); + + for (Integer i : primaryKeys) { + involvedKeys.add(i); + + entries.put(transformer.apply(i), 1); + + k = transformer.apply(i + 13); + + involvedKeys.add(i + 13); + + entries.put(k, 2); + } + + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + + ", tx=" + tx + ", entries=" + entries + ']'); + + cache.putAll(entries); + + tx.commit(); + } + catch (Throwable e) { + U.error(log, "Expected exception: ", e); + + // At least one stack trace should contain TransactionDeadlockException. + if (hasCause(e, TransactionTimeoutException.class) && + hasCause(e, TransactionDeadlockException.class) + ) { + if (deadlockErr.compareAndSet(null, cause(e, TransactionDeadlockException.class))) + U.error(log, "At least one stack trace should contain " + + TransactionDeadlockException.class.getSimpleName(), e); + } + } + } + }, loc ? 2 : txCnt, "tx-thread"); + + try { + fut.get(); + } + catch (IgniteCheckedException e) { + U.error(null, "Unexpected exception", e); + + fail(); + } + + U.sleep(1000); + + TransactionDeadlockException deadlockE = deadlockErr.get(); + + assertNotNull(deadlockE); + + boolean fail = false; + + // Check transactions, futures and entry locks state. + for (int i = 0; i < NODES_CNT * 2; i++) { + Ignite ignite = ignite(i); + + int cacheId = ((IgniteCacheProxy)ignite.cache(CACHE_NAME)).context().cacheId(); + + GridCacheSharedContext<Object, Object> cctx = ((IgniteKernal)ignite).context().cache().context(); + + IgniteTxManager txMgr = cctx.tm(); + + Collection<IgniteInternalTx> activeTxs = txMgr.activeTransactions(); + + for (IgniteInternalTx tx : activeTxs) { + Collection<IgniteTxEntry> entries = tx.allEntries(); + + for (IgniteTxEntry entry : entries) { + if (entry.cacheId() == cacheId) { + fail = true; + + U.error(log, "Transaction still exists: " + "\n" + tx.xidVersion() + + "\n" + tx.nearXidVersion() + "\n nodeId=" + cctx.localNodeId() + "\n tx=" + tx); + } + } + } + + Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures(); + + assertTrue(futs.isEmpty()); + + GridCacheAdapter<Object, Integer> intCache = internalCache(i, CACHE_NAME); + + GridCacheConcurrentMap map = intCache.map(); + + for (Integer key : involvedKeys) { + Object key0 = transformer.apply(key); + + KeyCacheObject keyCacheObj = intCache.context().toCacheKeyObject(key0); + + GridCacheMapEntry entry = map.getEntry(keyCacheObj); + + if (entry != null) + assertNull("Entry still has locks " + entry, entry.mvccAllLocal()); + } + } + + if (fail) + fail("Some transactions still exist"); + + // Check deadlock report + String msg = deadlockE.getMessage(); + + for (IgniteInternalTx tx : involvedTxs) + assertTrue(msg.contains( + "[txId=" + tx.xidVersion() + ", nodeId=" + tx.nodeId() + ", threadId=" + tx.threadId() + ']')); + + for (Integer key : involvedKeys) { + if (involvedLockedKeys.contains(key)) + assertTrue(msg.contains("[key=" + transformer.apply(key) + ", cache=" + CACHE_NAME + ']')); + else + assertFalse(msg.contains("[key=" + transformer.apply(key))); + } + } + + /** + * @param nodesCnt Nodes count. + * @param loc Local cache. + */ + private List<List<Integer>> generateKeys(int nodesCnt, boolean loc, boolean reverse) throws IgniteCheckedException { + List<List<Integer>> keySets = new ArrayList<>(); + + if (loc) { + List<Integer> keys = primaryKeys(ignite(0).cache(CACHE_NAME), 2); + + keySets.add(new ArrayList<>(keys)); + + Collections.reverse(keys); + + keySets.add(keys); + } + else { + for (int i = 0; i < nodesCnt; i++) { + List<Integer> keys = new ArrayList<>(2); + + int n1 = i + 1; + int n2 = n1 + 1; + + int i1 = n1 < nodesCnt ? n1 : n1 - nodesCnt; + int i2 = n2 < nodesCnt ? n2 : n2 - nodesCnt; + + keys.add(primaryKey(ignite(i1).cache(CACHE_NAME))); + keys.add(primaryKey(ignite(i2).cache(CACHE_NAME))); + + if (reverse) + Collections.reverse(keys); + + keySets.add(keys); + } + } + + return keySets; + } + + /** + * + */ + private static class NoOpTransformer implements IgniteClosure<Integer, Object> { + /** {@inheritDoc} */ + @Override public Object apply(Integer val) { + return val; + } + } + + /** + * + */ + private static class WrappingTransformer implements IgniteClosure<Integer, Object> { + /** {@inheritDoc} */ + @Override public Object apply(Integer val) { + return new KeyObject(val); + } + } + + /** + * + */ + private static class KeyObject implements Serializable { + /** Id. */ + private int id; + + /** Name. */ + private String name; + + /** + * @param id Id. + */ + public KeyObject(int id) { + this.id = id; + this.name = "KeyObject" + id; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "KeyObject{" + + "id=" + id + + ", name='" + name + '\'' + + '}'; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + KeyObject obj = (KeyObject)o; + + return id == obj.id && name.equals(obj.name); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** Tx count. */ + private static volatile int TX_CNT; + + /** Tx ids. */ + private static final Set<GridCacheVersion> TX_IDS = new GridConcurrentHashSet<>(); + + /** + * @param txCnt Tx count. + */ + private static void init(int txCnt) { + TX_CNT = txCnt; + TX_IDS.clear(); + } + + /** {@inheritDoc} */ + @Override public void sendMessage( + final ClusterNode node, + final Message msg, + final IgniteInClosure<IgniteException> ackC + ) throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + Message msg0 = ((GridIoMessage)msg).message(); + + if (msg0 instanceof GridNearTxPrepareRequest) { + final GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg0; + + GridCacheVersion txId = req.version(); + + if (TX_IDS.contains(txId)) { + while (TX_IDS.size() < TX_CNT) { + try { + U.sleep(50); + } + catch (IgniteInterruptedCheckedException e) { + e.printStackTrace(); + } + } + } + } + else if (msg0 instanceof GridNearTxPrepareResponse) { + GridNearTxPrepareResponse res = (GridNearTxPrepareResponse)msg0; + + GridCacheVersion txId = res.version(); + + TX_IDS.add(txId); + } + } + + super.sendMessage(node, msg, ackC); + } + } +}
