Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 52174ef77 -> 556a5dff6


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/556a5dff
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/556a5dff
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/556a5dff

Branch: refs/heads/ignite-zk
Commit: 556a5dff63534cb94faf41fb55ea0b112508d895
Parents: 52174ef
Author: sboikov <sboi...@gridgain.com>
Authored: Fri Dec 8 15:29:37 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Dec 8 15:29:37 2017 +0300

----------------------------------------------------------------------
 .../internal/DiscoverySpiTestListener.java      |  27 ++--
 .../IgniteClientReconnectApiExceptionTest.java  |   6 +-
 .../IgniteClientReconnectCacheTest.java         |   8 +-
 .../CacheLateAffinityAssignmentTest.java        |   9 +-
 .../ignite/messaging/GridMessagingSelfTest.java | 126 ++++---------------
 .../ZookeeperDiscoverySpiBasicTest.java         |   4 +-
 6 files changed, 53 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/556a5dff/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
index 3e32f52..b79454c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
@@ -18,13 +18,15 @@
 package org.apache.ignite.internal;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import 
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
-import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
@@ -38,7 +40,7 @@ public class DiscoverySpiTestListener implements 
IgniteDiscoverySpiInternalListe
     private volatile CountDownLatch joinLatch;
 
     /** */
-    private boolean blockCustomEvt;
+    private Set<Class<?>> blockCustomEvtCls;
 
     /** */
     private final Object mux = new Object();
@@ -55,14 +57,14 @@ public class DiscoverySpiTestListener implements 
IgniteDiscoverySpiInternalListe
     /**
      *
      */
-    public void startBlock() {
+    public void startBlockJoin() {
         joinLatch = new CountDownLatch(1);
     }
 
     /**
      *
      */
-    public void stopBlock() {
+    public void stopBlockJoin() {
         joinLatch.countDown();
     }
 
@@ -88,10 +90,10 @@ public class DiscoverySpiTestListener implements 
IgniteDiscoverySpiInternalListe
         this.log = log;
 
         synchronized (mux) {
-            if (blockCustomEvt) {
+            if (blockCustomEvtCls != null) {
                 DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, 
"delegate");
 
-                if (msg0 instanceof CacheAffinityChangeMessage) {
+                if (blockCustomEvtCls.contains(msg0.getClass())) {
                     log.info("Block custom message: " + msg0);
 
                     blockedMsgs.add(msg);
@@ -105,14 +107,19 @@ public class DiscoverySpiTestListener implements 
IgniteDiscoverySpiInternalListe
 
         return true;
     }
+
     /**
-     *
+     * @param blockCustomEvtCls Event class to block.
      */
-    public void blockCustomEvent() {
+    public void blockCustomEvent(Class<?> cls0, Class<?> ... 
blockCustomEvtCls) {
         synchronized (mux) {
             assert blockedMsgs.isEmpty() : blockedMsgs;
 
-            blockCustomEvt = true;
+            this.blockCustomEvtCls = new HashSet<>();
+
+            this.blockCustomEvtCls.add(cls0);
+
+            Collections.addAll(this.blockCustomEvtCls, blockCustomEvtCls);
         }
     }
 
@@ -138,7 +145,7 @@ public class DiscoverySpiTestListener implements 
IgniteDiscoverySpiInternalListe
         synchronized (this) {
             msgs = new ArrayList<>(blockedMsgs);
 
-            blockCustomEvt = false;
+            blockCustomEvtCls = null;
 
             blockedMsgs.clear();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/556a5dff/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
index 310f58b..ed0c8b8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
@@ -785,7 +785,7 @@ public class IgniteClientReconnectApiExceptionTest extends 
IgniteClientReconnect
         DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
 
         clientSpi.setInternalListener(lsnr);
-        lsnr.startBlock();
+        lsnr.startBlockJoin();
 
         final List<IgniteInternalFuture> futs = new ArrayList<>();
 
@@ -830,7 +830,7 @@ public class IgniteClientReconnectApiExceptionTest extends 
IgniteClientReconnect
 
             log.info("Allow reconnect.");
 
-            lsnr.stopBlock();
+            lsnr.stopBlockJoin();
 
             waitReconnectEvent(reconnectLatch);
 
@@ -855,7 +855,7 @@ public class IgniteClientReconnectApiExceptionTest extends 
IgniteClientReconnect
             }
         }
         finally {
-            lsnr.stopBlock();
+            lsnr.stopBlockJoin();
 
             for (IgniteInternalFuture fut : futs)
                 fut.cancel();

http://git-wip-us.apache.org/repos/asf/ignite/blob/556a5dff/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 8aad001..084241b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -194,7 +194,7 @@ public class IgniteClientReconnectCacheTest extends 
IgniteClientReconnectAbstrac
 
         clientSpi.setInternalListener(lsnr);
 
-        lsnr.startBlock();
+        lsnr.startBlockJoin();
 
         final AtomicReference<IgniteInternalFuture> blockPutRef = new 
AtomicReference<>();
 
@@ -260,7 +260,7 @@ public class IgniteClientReconnectCacheTest extends 
IgniteClientReconnectAbstrac
 
         log.info("Allow reconnect.");
 
-        lsnr.stopBlock();
+        lsnr.stopBlockJoin();
 
         assertTrue(reconnectLatch.await(5000, MILLISECONDS));
 
@@ -432,7 +432,7 @@ public class IgniteClientReconnectCacheTest extends 
IgniteClientReconnectAbstrac
 
         clientSpi.setInternalListener(lsnr);
 
-        lsnr.startBlock();
+        lsnr.startBlockJoin();
 
         client.events().localListen(new IgnitePredicate<Event>() {
             @Override public boolean apply(Event evt) {
@@ -540,7 +540,7 @@ public class IgniteClientReconnectCacheTest extends 
IgniteClientReconnectAbstrac
 
         assertTrue(putFailed.await(5000, MILLISECONDS));
 
-        lsnr.stopBlock();
+        lsnr.stopBlockJoin();
 
         waitReconnectEvent(reconnectLatch);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/556a5dff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 8853db8..8eef43c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -63,6 +63,7 @@ import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
+import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -676,7 +677,7 @@ public class CacheLateAffinityAssignmentTest extends 
GridCommonAbstractTest {
 
             
((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr);
 
-            lsnr.blockCustomEvent();
+            lsnr.blockCustomEvent(CacheAffinityChangeMessage.class);
 
             stopGrid(1);
 
@@ -1420,7 +1421,7 @@ public class CacheLateAffinityAssignmentTest extends 
GridCommonAbstractTest {
 
         checkAffinity(2, topVer(2, 0), true);
 
-        lsnr.blockCustomEvent();
+        lsnr.blockCustomEvent(CacheAffinityChangeMessage.class);
 
         startServer(2, 3);
 
@@ -1469,7 +1470,7 @@ public class CacheLateAffinityAssignmentTest extends 
GridCommonAbstractTest {
 
             checkAffinity(3, topVer(3, 1), false);
 
-            lsnr.blockCustomEvent();
+            lsnr.blockCustomEvent(CacheAffinityChangeMessage.class);
 
             stopNode(2, 4);
 
@@ -1547,7 +1548,7 @@ public class CacheLateAffinityAssignmentTest extends 
GridCommonAbstractTest {
 
         blockSupplySend(spi, CACHE_NAME2);
 
-        lsnr.blockCustomEvent();
+        lsnr.blockCustomEvent(CacheAffinityChangeMessage.class);
 
         startServer(1, 2);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/556a5dff/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
index 7541cec..a7c4521 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
@@ -24,7 +24,6 @@ import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -37,22 +36,20 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.DiscoverySpiTestListener;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import 
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
+import 
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
 import 
org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
-import org.apache.ignite.internal.processors.marshaller.MappingAcceptedMessage;
-import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.P2;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -206,11 +203,7 @@ public class GridMessagingSelfTest extends 
GridCommonAbstractTest implements Ser
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
-
-        discoSpi.setIpFinder(ipFinder);
-
-        cfg.setDiscoverySpi(discoSpi);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
         return cfg;
     }
@@ -1036,7 +1029,11 @@ public class GridMessagingSelfTest extends 
GridCommonAbstractTest implements Ser
     public void testAsyncOld() throws Exception {
         final AtomicInteger msgCnt = new AtomicInteger();
 
-        TestTcpDiscoverySpi discoSpi = 
(TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi();
+        IgniteDiscoverySpi discoSpi = 
(IgniteDiscoverySpi)ignite2.configuration().getDiscoverySpi();
+
+        DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
+
+        discoSpi.setInternalListener(lsnr);
 
         assertFalse(ignite2.message().isAsync());
 
@@ -1054,7 +1051,7 @@ public class GridMessagingSelfTest extends 
GridCommonAbstractTest implements Ser
             }
         }, IllegalStateException.class, null);
 
-        discoSpi.blockCustomEvent();
+        lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, 
StartRoutineDiscoveryMessageV2.class);
 
         final String topic = "topic";
 
@@ -1079,7 +1076,7 @@ public class GridMessagingSelfTest extends 
GridCommonAbstractTest implements Ser
 
         Assert.assertFalse(starFut.isDone());
 
-        discoSpi.stopBlock();
+        lsnr.stopBlockCustomEvents();
 
         GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
@@ -1095,7 +1092,7 @@ public class GridMessagingSelfTest extends 
GridCommonAbstractTest implements Ser
 
         Assert.assertTrue(starFut.isDone());
 
-        discoSpi.blockCustomEvent();
+        lsnr.blockCustomEvent(StopRoutineDiscoveryMessage.class);
 
         message(ignite1.cluster().forRemotes()).send(topic, "msg1");
 
@@ -1125,7 +1122,7 @@ public class GridMessagingSelfTest extends 
GridCommonAbstractTest implements Ser
 
         Assert.assertFalse(stopFut.isDone());
 
-        discoSpi.stopBlock();
+        lsnr.stopBlockCustomEvents();
 
         stopFut.get();
 
@@ -1144,9 +1141,13 @@ public class GridMessagingSelfTest extends 
GridCommonAbstractTest implements Ser
     public void testAsync() throws Exception {
         final AtomicInteger msgCnt = new AtomicInteger();
 
-        TestTcpDiscoverySpi discoSpi = 
(TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi();
+        IgniteDiscoverySpi discoSpi = 
(IgniteDiscoverySpi)ignite2.configuration().getDiscoverySpi();
 
-        discoSpi.blockCustomEvent();
+        DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
+
+        discoSpi.setInternalListener(lsnr);
+
+        lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, 
StartRoutineDiscoveryMessageV2.class);
 
         final String topic = "topic";
 
@@ -1167,7 +1168,7 @@ public class GridMessagingSelfTest extends 
GridCommonAbstractTest implements Ser
 
         Assert.assertFalse(starFut.isDone());
 
-        discoSpi.stopBlock();
+        lsnr.stopBlockCustomEvents();
 
         UUID id = starFut.get();
 
@@ -1175,7 +1176,7 @@ public class GridMessagingSelfTest extends 
GridCommonAbstractTest implements Ser
 
         Assert.assertTrue(starFut.isDone());
 
-        discoSpi.blockCustomEvent();
+        lsnr.blockCustomEvent(StopRoutineDiscoveryMessage.class);
 
         message(ignite1.cluster().forRemotes()).send(topic, "msg1");
 
@@ -1195,7 +1196,7 @@ public class GridMessagingSelfTest extends 
GridCommonAbstractTest implements Ser
 
         Assert.assertFalse(stopFut.isDone());
 
-        discoSpi.stopBlock();
+        lsnr.stopBlockCustomEvents();
 
         stopFut.get();
 
@@ -1209,89 +1210,6 @@ public class GridMessagingSelfTest extends 
GridCommonAbstractTest implements Ser
     }
 
     /**
-     *
-     */
-    static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
-        /** */
-        private boolean blockCustomEvt;
-
-        /** */
-        private final Object mux = new Object();
-
-        /** */
-        private List<DiscoverySpiCustomMessage> blockedMsgs = new 
ArrayList<>();
-
-        /** {@inheritDoc} */
-        @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) 
throws IgniteException {
-            synchronized (mux) {
-                if (blockCustomEvt) {
-                    DiscoveryCustomMessage msg0 = 
GridTestUtils.getFieldValue(msg, "delegate");
-
-                    if (msg0 instanceof MappingProposedMessage || msg0 
instanceof MappingAcceptedMessage){
-                        super.sendCustomEvent(msg);
-
-                        return;
-                    }
-
-                    if (msg0 instanceof StopRoutineDiscoveryMessage || msg0 
instanceof StartRoutineDiscoveryMessage) {
-                        log.info("Block custom message: " + msg0);
-
-                        blockedMsgs.add(msg);
-
-                        mux.notifyAll();
-
-                        return;
-                    }
-                }
-            }
-
-            super.sendCustomEvent(msg);
-        }
-
-        /**
-         *
-         */
-        public void blockCustomEvent() {
-            synchronized (mux) {
-                assert blockedMsgs.isEmpty() : blockedMsgs;
-
-                blockCustomEvt = true;
-            }
-        }
-
-        /**
-         * @throws InterruptedException If interrupted.
-         */
-        public void waitCustomEvent() throws InterruptedException {
-            synchronized (mux) {
-                while (blockedMsgs.isEmpty())
-                    mux.wait();
-            }
-        }
-
-        /**
-         *
-         */
-        public void stopBlock() {
-            List<DiscoverySpiCustomMessage> msgs;
-
-            synchronized (this) {
-                msgs = new ArrayList<>(blockedMsgs);
-
-                blockCustomEvt = false;
-
-                blockedMsgs.clear();
-            }
-
-            for (DiscoverySpiCustomMessage msg : msgs) {
-                log.info("Resend blocked message: " + msg);
-
-                super.sendCustomEvent(msg);
-            }
-        }
-    }
-
-    /**
      * Tests that message listener registers only for one oldest node.
      *
      * @throws Exception If an error occurred.

http://git-wip-us.apache.org/repos/asf/ignite/blob/556a5dff/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 455bf06..e9ae9be 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -1995,7 +1995,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
                 
((IgniteDiscoverySpi)client.configuration().getDiscoverySpi()).setInternalListener(lsnr);
 
-                lsnr.startBlock();
+                lsnr.startBlockJoin();
 
                 lsnrs.add(lsnr);
             }
@@ -2056,7 +2056,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
             disconnectedC.run();
 
             for (DiscoverySpiTestListener lsnr : lsnrs)
-                lsnr.stopBlock();
+                lsnr.stopBlockJoin();
         }
 
         waitReconnectEvent(log, reconnectLatch);

Reply via email to