Repository: ignite Updated Branches: refs/heads/ignite-zk 52174ef77 -> 556a5dff6
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/556a5dff Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/556a5dff Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/556a5dff Branch: refs/heads/ignite-zk Commit: 556a5dff63534cb94faf41fb55ea0b112508d895 Parents: 52174ef Author: sboikov <sboi...@gridgain.com> Authored: Fri Dec 8 15:29:37 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Dec 8 15:29:37 2017 +0300 ---------------------------------------------------------------------- .../internal/DiscoverySpiTestListener.java | 27 ++-- .../IgniteClientReconnectApiExceptionTest.java | 6 +- .../IgniteClientReconnectCacheTest.java | 8 +- .../CacheLateAffinityAssignmentTest.java | 9 +- .../ignite/messaging/GridMessagingSelfTest.java | 126 ++++--------------- .../ZookeeperDiscoverySpiBasicTest.java | 4 +- 6 files changed, 53 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/556a5dff/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java index 3e32f52..b79454c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java @@ -18,13 +18,15 @@ package org.apache.ignite.internal; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; -import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; @@ -38,7 +40,7 @@ public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListe private volatile CountDownLatch joinLatch; /** */ - private boolean blockCustomEvt; + private Set<Class<?>> blockCustomEvtCls; /** */ private final Object mux = new Object(); @@ -55,14 +57,14 @@ public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListe /** * */ - public void startBlock() { + public void startBlockJoin() { joinLatch = new CountDownLatch(1); } /** * */ - public void stopBlock() { + public void stopBlockJoin() { joinLatch.countDown(); } @@ -88,10 +90,10 @@ public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListe this.log = log; synchronized (mux) { - if (blockCustomEvt) { + if (blockCustomEvtCls != null) { DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate"); - if (msg0 instanceof CacheAffinityChangeMessage) { + if (blockCustomEvtCls.contains(msg0.getClass())) { log.info("Block custom message: " + msg0); blockedMsgs.add(msg); @@ -105,14 +107,19 @@ public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListe return true; } + /** - * + * @param blockCustomEvtCls Event class to block. */ - public void blockCustomEvent() { + public void blockCustomEvent(Class<?> cls0, Class<?> ... blockCustomEvtCls) { synchronized (mux) { assert blockedMsgs.isEmpty() : blockedMsgs; - blockCustomEvt = true; + this.blockCustomEvtCls = new HashSet<>(); + + this.blockCustomEvtCls.add(cls0); + + Collections.addAll(this.blockCustomEvtCls, blockCustomEvtCls); } } @@ -138,7 +145,7 @@ public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListe synchronized (this) { msgs = new ArrayList<>(blockedMsgs); - blockCustomEvt = false; + blockCustomEvtCls = null; blockedMsgs.clear(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/556a5dff/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java index 310f58b..ed0c8b8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java @@ -785,7 +785,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); clientSpi.setInternalListener(lsnr); - lsnr.startBlock(); + lsnr.startBlockJoin(); final List<IgniteInternalFuture> futs = new ArrayList<>(); @@ -830,7 +830,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect log.info("Allow reconnect."); - lsnr.stopBlock(); + lsnr.stopBlockJoin(); waitReconnectEvent(reconnectLatch); @@ -855,7 +855,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect } } finally { - lsnr.stopBlock(); + lsnr.stopBlockJoin(); for (IgniteInternalFuture fut : futs) fut.cancel(); http://git-wip-us.apache.org/repos/asf/ignite/blob/556a5dff/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 8aad001..084241b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -194,7 +194,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac clientSpi.setInternalListener(lsnr); - lsnr.startBlock(); + lsnr.startBlockJoin(); final AtomicReference<IgniteInternalFuture> blockPutRef = new AtomicReference<>(); @@ -260,7 +260,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac log.info("Allow reconnect."); - lsnr.stopBlock(); + lsnr.stopBlockJoin(); assertTrue(reconnectLatch.await(5000, MILLISECONDS)); @@ -432,7 +432,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac clientSpi.setInternalListener(lsnr); - lsnr.startBlock(); + lsnr.startBlockJoin(); client.events().localListen(new IgnitePredicate<Event>() { @Override public boolean apply(Event evt) { @@ -540,7 +540,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac assertTrue(putFailed.await(5000, MILLISECONDS)); - lsnr.stopBlock(); + lsnr.stopBlockJoin(); waitReconnectEvent(reconnectLatch); http://git-wip-us.apache.org/repos/asf/ignite/blob/556a5dff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java index 8853db8..8eef43c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java @@ -63,6 +63,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; +import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -676,7 +677,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { ((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr); - lsnr.blockCustomEvent(); + lsnr.blockCustomEvent(CacheAffinityChangeMessage.class); stopGrid(1); @@ -1420,7 +1421,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { checkAffinity(2, topVer(2, 0), true); - lsnr.blockCustomEvent(); + lsnr.blockCustomEvent(CacheAffinityChangeMessage.class); startServer(2, 3); @@ -1469,7 +1470,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { checkAffinity(3, topVer(3, 1), false); - lsnr.blockCustomEvent(); + lsnr.blockCustomEvent(CacheAffinityChangeMessage.class); stopNode(2, 4); @@ -1547,7 +1548,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { blockSupplySend(spi, CACHE_NAME2); - lsnr.blockCustomEvent(); + lsnr.blockCustomEvent(CacheAffinityChangeMessage.class); startServer(1, 2); http://git-wip-us.apache.org/repos/asf/ignite/blob/556a5dff/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java index 7541cec..a7c4521 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java @@ -24,7 +24,6 @@ import java.io.ObjectOutput; import java.io.Serializable; import java.net.URL; import java.net.URLClassLoader; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -37,22 +36,20 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.DiscoverySpiTestListener; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage; +import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2; import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage; -import org.apache.ignite.internal.processors.marshaller.MappingAcceptedMessage; -import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -206,11 +203,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi(); - - discoSpi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(discoSpi); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); return cfg; } @@ -1036,7 +1029,11 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser public void testAsyncOld() throws Exception { final AtomicInteger msgCnt = new AtomicInteger(); - TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi(); + IgniteDiscoverySpi discoSpi = (IgniteDiscoverySpi)ignite2.configuration().getDiscoverySpi(); + + DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); + + discoSpi.setInternalListener(lsnr); assertFalse(ignite2.message().isAsync()); @@ -1054,7 +1051,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } }, IllegalStateException.class, null); - discoSpi.blockCustomEvent(); + lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class); final String topic = "topic"; @@ -1079,7 +1076,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser Assert.assertFalse(starFut.isDone()); - discoSpi.stopBlock(); + lsnr.stopBlockCustomEvents(); GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { @@ -1095,7 +1092,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser Assert.assertTrue(starFut.isDone()); - discoSpi.blockCustomEvent(); + lsnr.blockCustomEvent(StopRoutineDiscoveryMessage.class); message(ignite1.cluster().forRemotes()).send(topic, "msg1"); @@ -1125,7 +1122,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser Assert.assertFalse(stopFut.isDone()); - discoSpi.stopBlock(); + lsnr.stopBlockCustomEvents(); stopFut.get(); @@ -1144,9 +1141,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser public void testAsync() throws Exception { final AtomicInteger msgCnt = new AtomicInteger(); - TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi(); + IgniteDiscoverySpi discoSpi = (IgniteDiscoverySpi)ignite2.configuration().getDiscoverySpi(); - discoSpi.blockCustomEvent(); + DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); + + discoSpi.setInternalListener(lsnr); + + lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class); final String topic = "topic"; @@ -1167,7 +1168,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser Assert.assertFalse(starFut.isDone()); - discoSpi.stopBlock(); + lsnr.stopBlockCustomEvents(); UUID id = starFut.get(); @@ -1175,7 +1176,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser Assert.assertTrue(starFut.isDone()); - discoSpi.blockCustomEvent(); + lsnr.blockCustomEvent(StopRoutineDiscoveryMessage.class); message(ignite1.cluster().forRemotes()).send(topic, "msg1"); @@ -1195,7 +1196,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser Assert.assertFalse(stopFut.isDone()); - discoSpi.stopBlock(); + lsnr.stopBlockCustomEvents(); stopFut.get(); @@ -1209,89 +1210,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } /** - * - */ - static class TestTcpDiscoverySpi extends TcpDiscoverySpi { - /** */ - private boolean blockCustomEvt; - - /** */ - private final Object mux = new Object(); - - /** */ - private List<DiscoverySpiCustomMessage> blockedMsgs = new ArrayList<>(); - - /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { - synchronized (mux) { - if (blockCustomEvt) { - DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate"); - - if (msg0 instanceof MappingProposedMessage || msg0 instanceof MappingAcceptedMessage){ - super.sendCustomEvent(msg); - - return; - } - - if (msg0 instanceof StopRoutineDiscoveryMessage || msg0 instanceof StartRoutineDiscoveryMessage) { - log.info("Block custom message: " + msg0); - - blockedMsgs.add(msg); - - mux.notifyAll(); - - return; - } - } - } - - super.sendCustomEvent(msg); - } - - /** - * - */ - public void blockCustomEvent() { - synchronized (mux) { - assert blockedMsgs.isEmpty() : blockedMsgs; - - blockCustomEvt = true; - } - } - - /** - * @throws InterruptedException If interrupted. - */ - public void waitCustomEvent() throws InterruptedException { - synchronized (mux) { - while (blockedMsgs.isEmpty()) - mux.wait(); - } - } - - /** - * - */ - public void stopBlock() { - List<DiscoverySpiCustomMessage> msgs; - - synchronized (this) { - msgs = new ArrayList<>(blockedMsgs); - - blockCustomEvt = false; - - blockedMsgs.clear(); - } - - for (DiscoverySpiCustomMessage msg : msgs) { - log.info("Resend blocked message: " + msg); - - super.sendCustomEvent(msg); - } - } - } - - /** * Tests that message listener registers only for one oldest node. * * @throws Exception If an error occurred. http://git-wip-us.apache.org/repos/asf/ignite/blob/556a5dff/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 455bf06..e9ae9be 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -1995,7 +1995,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { ((IgniteDiscoverySpi)client.configuration().getDiscoverySpi()).setInternalListener(lsnr); - lsnr.startBlock(); + lsnr.startBlockJoin(); lsnrs.add(lsnr); } @@ -2056,7 +2056,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { disconnectedC.run(); for (DiscoverySpiTestListener lsnr : lsnrs) - lsnr.stopBlock(); + lsnr.stopBlockJoin(); } waitReconnectEvent(log, reconnectLatch);