Added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a900dc68 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a900dc68 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a900dc68 Branch: refs/heads/ignite-5024 Commit: a900dc685a24b21ac155796ecb027c60608b88f5 Parents: ca8ad03 Author: sboikov <[email protected]> Authored: Fri Apr 21 17:30:23 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Apr 21 17:30:23 2017 +0300 ---------------------------------------------------------------------- .../internal/TestRecordingCommunicationSpi.java | 29 ++- .../cache/IgniteOnePhaseCommitInvokeTest.java | 10 +- .../CacheLateAffinityAssignmentTest.java | 31 ++- ...heClientMultiNodeUpdateTopologyLockTest.java | 193 +++++++++++++++++++ .../IgniteCacheReadFromBackupTest.java | 15 +- .../IgniteTxCachePrimarySyncTest.java | 17 +- .../dht/IgniteCacheTxRecoveryRollbackTest.java | 17 +- .../atomic/IgniteCacheAtomicProtocolTest.java | 183 ++---------------- .../ignite/testframework/GridTestNode.java | 7 + .../junits/common/GridCommonAbstractTest.java | 76 +++++++- 10 files changed, 353 insertions(+), 225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index aa0cc09..98d2553 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -32,7 +32,6 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; @@ -59,7 +58,7 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { private Map<Class<?>, Set<String>> blockCls = new HashMap<>(); /** */ - private IgnitePredicate<GridIoMessage> blockP; + private IgniteBiPredicate<ClusterNode, Message> blockP; /** * @param node Node. @@ -75,16 +74,18 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { if (msg instanceof GridIoMessage) { GridIoMessage ioMsg = (GridIoMessage)msg; - Object msg0 = ioMsg.message(); + Message msg0 = ioMsg.message(); synchronized (this) { - if ((recordClasses != null && recordClasses.contains(msg0.getClass())) || - (recordP != null && recordP.apply(node, msg))) + boolean record = (recordClasses != null && recordClasses.contains(msg0.getClass())) || + (recordP != null && recordP.apply(node, msg0)); + + if (record) recordedMsgs.add(msg0); boolean block = false; - if (blockP != null && blockP.apply(ioMsg)) + if (blockP != null && blockP.apply(node, msg0)) block = true; else { Set<String> blockNodes = blockCls.get(msg0.getClass()); @@ -106,6 +107,8 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { return; } + else if (record) + notifyAll(); } } @@ -166,7 +169,7 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { * @param nodeName Node name. * @throws InterruptedException If interrupted. */ - public void waitForMessage(Class<?> cls, String nodeName) throws InterruptedException { + public void waitForBlocked(Class<?> cls, String nodeName) throws InterruptedException { synchronized (this) { while (!hasMessage(cls, nodeName)) wait(); @@ -174,6 +177,16 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { } /** + * @throws InterruptedException If interrupted. + */ + public void waitForRecorded() throws InterruptedException { + synchronized (this) { + while (recordedMsgs.isEmpty()) + wait(); + } + } + + /** * @param cls Message class. * @param nodeName Node name. * @return {@code True} if has blocked message. @@ -191,7 +204,7 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { /** * @param blockP Message block predicate. */ - public void blockMessages(IgnitePredicate<GridIoMessage> blockP) { + public void blockMessages(IgniteBiPredicate<ClusterNode, Message> blockP) { synchronized (this) { this.blockP = blockP; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java index 601c067..a5cb3f2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java @@ -21,17 +21,17 @@ import java.util.concurrent.Callable; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -127,10 +127,8 @@ public class IgniteOnePhaseCommitInvokeTest extends GridCommonAbstractTest { final Ignite clientNode = startGrid(1); - TestRecordingCommunicationSpi.spi(srv0).blockMessages(new IgnitePredicate<GridIoMessage>() { - @Override public boolean apply(GridIoMessage msg0) { - Message msg = msg0.message(); - + TestRecordingCommunicationSpi.spi(srv0).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { return msg instanceof GridDhtPartitionSupplyMessage && ((GridDhtPartitionSupplyMessage)msg).cacheId() == CU.cacheId(CACHE_NAME); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java index a74117c..c68c8d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java @@ -75,6 +75,7 @@ import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; @@ -1082,12 +1083,10 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi(); - spi.blockMessages(new IgnitePredicate<GridIoMessage>() { - @Override public boolean apply(GridIoMessage msg) { - Message msg0 = msg.message(); - - return msg0.getClass().equals(GridDhtPartitionsSingleMessage.class) || - msg0.getClass().equals(GridDhtPartitionsFullMessage.class); + spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return msg.getClass().equals(GridDhtPartitionsSingleMessage.class) || + msg.getClass().equals(GridDhtPartitionsFullMessage.class); } }); } @@ -1710,14 +1709,12 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { @Override public TestRecordingCommunicationSpi apply(String s) { TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi(); - spi.blockMessages(new IgnitePredicate<GridIoMessage>() { - @Override public boolean apply(GridIoMessage msg) { - Message msg0 = msg.message(); - - if (msg0 instanceof GridDhtForceKeysRequest || msg0 instanceof GridDhtForceKeysResponse) { + spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + if (msg instanceof GridDhtForceKeysRequest || msg instanceof GridDhtForceKeysResponse) { fail.set(true); - U.dumpStack(log, "Unexpected message: " + msg0); + U.dumpStack(log, "Unexpected message: " + msg); } return false; @@ -2011,14 +2008,12 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { * @param cacheName Cache name. */ private void blockSupplySend(TestRecordingCommunicationSpi spi, final String cacheName) { - spi.blockMessages(new IgnitePredicate<GridIoMessage>() { - @Override public boolean apply(GridIoMessage ioMsg) { - if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessage.class)) + spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + if (!msg.getClass().equals(GridDhtPartitionSupplyMessage.class)) return false; - GridDhtPartitionSupplyMessage msg = (GridDhtPartitionSupplyMessage)ioMsg.message(); - - return msg.cacheId() == CU.cacheId(cacheName); + return ((GridDhtPartitionSupplyMessage)msg).cacheId() == CU.cacheId(cacheName); } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientMultiNodeUpdateTopologyLockTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientMultiNodeUpdateTopologyLockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientMultiNodeUpdateTopologyLockTest.java new file mode 100644 index 0000000..4adf5f4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientMultiNodeUpdateTopologyLockTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.UUID; +import java.util.concurrent.Callable; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class IgniteCacheClientMultiNodeUpdateTopologyLockTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String TEST_CACHE = "testCache"; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setConsistentId(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTx() throws Exception { + startGrids(3); + + client = true; + + Ignite clientNode = startGrid(3); + + client = false; + + IgniteCache<Integer, Integer> cache = clientNode.createCache(cacheConfiguration(0, FULL_SYNC)); + + awaitPartitionMapExchange(); + + Integer key1 = movingKeysAfterJoin(ignite(1), TEST_CACHE, 1).get(0); + Integer key2 = movingKeysAfterJoin(ignite(2), TEST_CACHE, 1).get(0); + + log.info("Start tx [key1=" + key1 + ", key2=" + key2 + ']'); + + IgniteInternalFuture<?> startFut; + + TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(ignite(2)); + + TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(clientNode); + + final UUID node0Id = ignite(0).cluster().localNode().id(); + final UUID node2Id = ignite(2).cluster().localNode().id(); + + spi2.record(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + if (!node0Id.equals(node.id())) + return false; + + return (msg instanceof GridDhtPartitionsSingleMessage) && + ((GridDhtPartitionsSingleMessage)msg).exchangeId() != null; + } + }); + + clientSpi.record(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + if (!node2Id.equals(node.id())) + return false; + + if (msg instanceof GridNearTxFinishRequest) { + log.info("Delay message [msg=" + msg + ']'); + + try { + Thread.sleep(5000); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + + log.info("Send delayed message [msg=" + msg + ']'); + } + + return false; + } + }); + + try (Transaction tx = clientNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key1, 1); + + startFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + startGrid(4); + + return null; + } + }, "start-thread"); + + spi2.waitForRecorded(); + + U.sleep(5); + + cache.put(key2, 2); + + log.info("Commit tx"); + + tx.commit(); + } + + assertEquals((Integer)1, cache.get(key1)); + assertEquals((Integer)2, cache.get(key2)); + + startFut.get(); + + assertEquals((Integer)1, cache.get(key1)); + assertEquals((Integer)2, cache.get(key2)); + + awaitPartitionMapExchange(); + + assertEquals((Integer)1, cache.get(key1)); + assertEquals((Integer)2, cache.get(key2)); + } + + /** + * @param backups Number of backups. + * @param writeSync Cache write synchronization mode. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration(int backups, + CacheWriteSynchronizationMode writeSync) { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setName(TEST_CACHE); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(writeSync); + ccfg.setBackups(backups); + ccfg.setRebalanceMode(ASYNC); + + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java index 29c2af6..42de613 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java @@ -33,16 +33,17 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -195,14 +196,12 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest { TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); - spi.blockMessages(new IgnitePredicate<GridIoMessage>() { - @Override public boolean apply(GridIoMessage ioMsg) { - if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessage.class)) + spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + if (!msg.getClass().equals(GridDhtPartitionSupplyMessage.class)) return false; - GridDhtPartitionSupplyMessage msg = (GridDhtPartitionSupplyMessage)ioMsg.message(); - - return msg.cacheId() == CU.cacheId(ccfg.getName()); + return ((GridDhtPartitionSupplyMessage)msg).cacheId() == CU.cacheId(ccfg.getName()); } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java index 8a1d4a7..91e6cf6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java @@ -29,7 +29,6 @@ import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.Affinity; @@ -41,7 +40,6 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; @@ -50,10 +48,11 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.internal.util.lang.GridAbsPredicate; -import org.apache.ignite.internal.util.lang.IgnitePredicateX; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -213,9 +212,9 @@ public class IgniteTxCachePrimarySyncTest extends GridCommonAbstractTest { commSpi0.record(GridDhtTxFinishRequest.class); - commSpi0.blockMessages(new IgnitePredicateX<GridIoMessage>() { - @Override public boolean applyx(GridIoMessage e) throws IgniteCheckedException { - return e.message() instanceof GridDhtTxFinishRequest; + commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return msg instanceof GridDhtTxFinishRequest; } }); @@ -466,9 +465,9 @@ public class IgniteTxCachePrimarySyncTest extends GridCommonAbstractTest { commSpi0.record(GridDhtTxFinishRequest.class); - commSpi0.blockMessages(new IgnitePredicateX<GridIoMessage>() { - @Override public boolean applyx(GridIoMessage e) throws IgniteCheckedException { - return e.message() instanceof GridDhtTxFinishRequest; + commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return msg instanceof GridDhtTxFinishRequest; } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java index cfe9029..7e7d341 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java @@ -32,13 +32,13 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -46,7 +46,8 @@ import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -189,7 +190,7 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest { assertFalse(fut.isDone()); - testSpi(client2).waitForMessage(GridNearTxFinishRequest.class, srv0.name()); + testSpi(client2).waitForBlocked(GridNearTxFinishRequest.class, srv0.name()); stopGrid(client2.name()); @@ -264,9 +265,9 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest { testSpi(client2).blockMessages(GridNearTxFinishRequest.class, srv0.name()); - testSpi(srv0).blockMessages(new IgnitePredicate<GridIoMessage>() { - @Override public boolean apply(GridIoMessage msg) { - return msg.message() instanceof GridDhtTxFinishRequest; + testSpi(srv0).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return msg instanceof GridDhtTxFinishRequest; } }); @@ -292,7 +293,7 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest { assertFalse(fut.isDone()); - testSpi(client2).waitForMessage(GridNearTxFinishRequest.class, srv0.name()); + testSpi(client2).waitForBlocked(GridNearTxFinishRequest.class, srv0.name()); stopGrid(client2.name()); stopGrid(srv0.name()); @@ -397,7 +398,7 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest { assertFalse(fut.isDone()); - testSpi(srv0).waitForMessage(GridNearTxPrepareResponse.class, client.name()); + testSpi(srv0).waitForBlocked(GridNearTxPrepareResponse.class, client.name()); stopGrid(client.name()); http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java index 5a6b1c8..591858a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java @@ -17,14 +17,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.TimeUnit; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; @@ -32,18 +29,11 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.Affinity; -import org.apache.ignite.cache.affinity.AffinityFunction; -import org.apache.ignite.cache.affinity.AffinityFunctionContext; -import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.managers.communication.GridIoMessage; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; -import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -51,15 +41,14 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; @@ -112,12 +101,10 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { */ private void blockRebalance() { for (Ignite node : G.allGrids()) { - testSpi(node).blockMessages(new IgnitePredicate<GridIoMessage>() { - @Override public boolean apply(GridIoMessage msg) { - Object msg0 = msg.message(); - - return (msg0 instanceof GridDhtPartitionSupplyMessage) - && ((GridCacheMessage)msg0).cacheId() == CU.cacheId(TEST_CACHE); + testSpi(node).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return (msg instanceof GridDhtPartitionSupplyMessage) + && ((GridCacheMessage)msg).cacheId() == CU.cacheId(TEST_CACHE); } }); } @@ -368,7 +355,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1, FULL_ASYNC)); - List<Integer> keys = getKeysMoved(srv0, TEST_CACHE, putAll ? 3 : 1); + List<Integer> keys = movingKeysAfterJoin(srv0, TEST_CACHE, putAll ? 10 : 1); testSpi(clientNode).blockMessages(GridNearAtomicSingleUpdateRequest.class, srv0.name()); testSpi(clientNode).blockMessages(GridNearAtomicFullUpdateRequest.class, srv0.name()); @@ -663,9 +650,9 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE); - testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() { - @Override public boolean apply(GridIoMessage msg) { - return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest; + testSpi(ignite(0)).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return msg instanceof GridDhtAtomicAbstractUpdateRequest; } }); @@ -719,16 +706,16 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE); if (fail0) { - testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() { - @Override public boolean apply(GridIoMessage msg) { - return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest; + testSpi(ignite(0)).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return msg instanceof GridDhtAtomicAbstractUpdateRequest; } }); } if (fail1) { - testSpi(ignite(2)).blockMessages(new IgnitePredicate<GridIoMessage>() { - @Override public boolean apply(GridIoMessage msg) { - return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest; + testSpi(ignite(2)).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return msg instanceof GridDhtAtomicAbstractUpdateRequest; } }); } @@ -825,68 +812,6 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { } /** - * Return list of keys that are primary for given node on given topology, - * but will not be primary after add one new node. - * - * @param ign Ignite. - * @param cacheName Cache name. - * @param size Number of keys. - * @return List of keys. - */ - private List<Integer> getKeysMoved(Ignite ign, String cacheName, int size) { - GridCacheContext<Object, Object> cctx = ((IgniteKernal)ign).context().cache().internalCache(cacheName).context(); - - ArrayList<ClusterNode> nodes = new ArrayList<>(ign.cluster().nodes()); - - AffinityFunction func = cctx.config().getAffinity(); - - AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl( - nodes, - null, - null, - new AffinityTopologyVersion(1, 0), - cctx.config().getBackups()); - - List<List<ClusterNode>> calcAff = func.assignPartitions(ctx); - - String name = getTestIgniteInstanceName(nodes.size()); - - nodes.add(new FakeNode(name)); - - ctx = new GridAffinityFunctionContextImpl( - nodes, - null, - null, - new AffinityTopologyVersion(1, 0), - cctx.config().getBackups()); - - List<List<ClusterNode>> calcAff2 = func.assignPartitions(ctx); - - Set<Integer> movedParts = new HashSet<>(); - - UUID localId = ign.cluster().localNode().id(); - - for (int i = 0; i < calcAff.size(); i++) { - if (calcAff.get(i).get(0).id().equals(localId) && !calcAff2.get(i).get(0).id().equals(localId)) - movedParts.add(i); - } - - List<Integer> keys = new ArrayList<>(); - - for (int i = 0; i < 10000; i++) { - int keyPart = func.partition(ign.affinity(cacheName).affinityKey(i)); - - if (movedParts.contains(keyPart)) - keys.add(i); - - if (keys.size() == size) - break; - } - - return keys; - } - - /** * */ public static class SetValueEntryProcessor implements CacheEntryProcessor<Integer, Integer, Object> { @@ -908,80 +833,4 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { return null; } } - - /** - * - */ - public static class FakeNode implements ClusterNode { - /** */ - private final String consistendId; - /** */ - private final UUID uuid; - - /** */ - public FakeNode(String consistendId) { - this.consistendId = consistendId; - uuid = UUID.randomUUID(); - } - - /** {@inheritDoc} */ - @Override public UUID id() { - return uuid; - } - - /** {@inheritDoc} */ - @Override public Object consistentId() { - return consistendId; - } - - /** {@inheritDoc} */ - @Nullable @Override public <T> T attribute(String name) { - return null; - } - - /** {@inheritDoc} */ - @Override public ClusterMetrics metrics() { - return null; - } - - /** {@inheritDoc} */ - @Override public Map<String, Object> attributes() { - return null; - } - - /** {@inheritDoc} */ - @Override public Collection<String> addresses() { - return null; - } - - /** {@inheritDoc} */ - @Override public Collection<String> hostNames() { - return null; - } - - /** {@inheritDoc} */ - @Override public long order() { - return 0; - } - - /** {@inheritDoc} */ - @Override public IgniteProductVersion version() { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean isLocal() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isDaemon() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isClient() { - return false; - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java index d331387..cefb774 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java @@ -103,6 +103,13 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod return id; } + /** + * @param consistentId Consistent ID. + */ + public void consistentId(Object consistentId) { + this.consistentId = consistentId; + } + /** {@inheritDoc} */ @Override public Object consistentId() { return consistentId; http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index c76c83e..e6b30e0 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -46,10 +46,10 @@ import org.apache.ignite.IgniteEvents; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.Ignition; -import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; @@ -64,7 +64,9 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheExplicitLockSpan; import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -91,6 +93,7 @@ import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.GridAbstractTest; import org.apache.ignite.transactions.Transaction; @@ -1117,6 +1120,77 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { } /** + * Return list of keys that are primary for given node on current topology, + * but primary node will change after new node will be added. + * + * @param ign Ignite. + * @param cacheName Cache name. + * @param size Number of keys. + * @return List of keys. + */ + protected final List<Integer> movingKeysAfterJoin(Ignite ign, String cacheName, int size) { + assertEquals("Expected consistentId is set to node name", ign.name(), ign.cluster().localNode().consistentId()); + + GridCacheContext<Object, Object> cctx = ((IgniteKernal)ign).context().cache().internalCache(cacheName).context(); + + ArrayList<ClusterNode> nodes = new ArrayList<>(ign.cluster().nodes()); + + AffinityFunction func = cctx.config().getAffinity(); + + AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl( + nodes, + null, + null, + AffinityTopologyVersion.NONE, + cctx.config().getBackups()); + + List<List<ClusterNode>> calcAff = func.assignPartitions(ctx); + + GridTestNode fakeNode = new GridTestNode(UUID.randomUUID(), null); + + fakeNode.consistentId(getTestIgniteInstanceName(nodes.size())); + + nodes.add(fakeNode); + + ctx = new GridAffinityFunctionContextImpl( + nodes, + null, + null, + AffinityTopologyVersion.NONE, + cctx.config().getBackups()); + + List<List<ClusterNode>> calcAff2 = func.assignPartitions(ctx); + + Set<Integer> movedParts = new HashSet<>(); + + UUID locId = ign.cluster().localNode().id(); + + for (int i = 0; i < calcAff.size(); i++) { + if (calcAff.get(i).get(0).id().equals(locId) && !calcAff2.get(i).get(0).id().equals(locId)) + movedParts.add(i); + } + + List<Integer> keys = new ArrayList<>(); + + Affinity<Integer> aff = ign.affinity(cacheName); + + for (int i = 0; i < 10_000; i++) { + int keyPart = aff.partition(i); + + if (movedParts.contains(keyPart)) { + keys.add(i); + + if (keys.size() == size) + break; + } + } + + assertEquals("Failed to find moving keys [movedPats=" + movedParts + ", keys=" + keys + ']', size, keys.size()); + + return keys; + } + + /** * @param cache Cache. * @return Collection of keys for which given cache is primary. * @throws IgniteCheckedException If failed.
