IGNITE-9129 P2P class deployment with ZK discovery fixed.

Signed-off-by: agura <ag...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e9bb1c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e9bb1c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e9bb1c0

Branch: refs/heads/ignite-8446
Commit: 0e9bb1c0e50ed8a29ccd3d6a9e2bd5311b6b7912
Parents: 0b39405
Author: Pavel Kovalenko <jokse...@gmail.com>
Authored: Thu Aug 2 16:18:44 2018 +0300
Committer: agura <ag...@apache.org>
Committed: Thu Aug 2 16:19:24 2018 +0300

----------------------------------------------------------------------
 .../continuous/GridContinuousProcessor.java     | 72 +++++++++++---------
 .../GridP2PContinuousDeploymentSelfTest.java    | 70 ++++++++++++++++++-
 .../zk/ZookeeperDiscoverySpiTestSuite2.java     |  3 +
 3 files changed, 109 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0e9bb1c0/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 2d48b7d..6723ea4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -544,7 +544,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * @param routineInfo Routine info.
      */
     private void startDiscoveryDataRoutine(ContinuousRoutineInfo routineInfo) {
-        IgnitePredicate<ClusterNode> nodeFilter = null;
+        IgnitePredicate<ClusterNode> nodeFilter;
 
         try {
             if (routineInfo.nodeFilter != null) {
@@ -552,6 +552,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
                 ctx.resource().injectGeneric(nodeFilter);
             }
+            else
+                nodeFilter = null;
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to unmarshal continuous routine filter, 
ignore routine [" +
@@ -561,45 +563,47 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             return;
         }
 
-        if (nodeFilter == null || 
nodeFilter.apply(ctx.discovery().localNode())) {
-            GridContinuousHandler hnd;
+        ctx.discovery().localJoinFuture().listen(f -> 
ctx.closure().runLocalSafe(() -> {
+            if (nodeFilter == null || 
nodeFilter.apply(ctx.discovery().localNode())) {
+                GridContinuousHandler hnd;
 
-            try {
-                hnd = U.unmarshal(marsh, routineInfo.hnd, 
U.resolveClassLoader(ctx.config()));
+                try {
+                    hnd = U.unmarshal(marsh, routineInfo.hnd, 
U.resolveClassLoader(ctx.config()));
 
-                if (ctx.config().isPeerClassLoadingEnabled())
-                    hnd.p2pUnmarshal(routineInfo.srcNodeId, ctx);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to unmarshal continuous routine handler, 
ignore routine [" +
-                    "routineId=" + routineInfo.routineId +
-                    ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
+                    if (ctx.config().isPeerClassLoadingEnabled())
+                        hnd.p2pUnmarshal(routineInfo.srcNodeId, ctx);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to unmarshal continuous routine 
handler, ignore routine [" +
+                        "routineId=" + routineInfo.routineId +
+                        ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
 
-                return;
-            }
+                    return;
+                }
 
-            try {
-                registerHandler(routineInfo.srcNodeId,
-                    routineInfo.routineId,
-                    hnd,
-                    routineInfo.bufSize,
-                    routineInfo.interval,
-                    routineInfo.autoUnsubscribe,
-                    false);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to register continuous routine handler, 
ignore routine [" +
-                    "routineId=" + routineInfo.routineId +
-                    ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
+                try {
+                    registerHandler(routineInfo.srcNodeId,
+                        routineInfo.routineId,
+                        hnd,
+                        routineInfo.bufSize,
+                        routineInfo.interval,
+                        routineInfo.autoUnsubscribe,
+                        false);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to register continuous routine 
handler, ignore routine [" +
+                        "routineId=" + routineInfo.routineId +
+                        ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
+                }
             }
-        }
-        else {
-            if (log.isDebugEnabled()) {
-                log.debug("Do not register continuous routine, rejected by 
node filter [" +
-                    "routineId=" + routineInfo.routineId +
-                    ", srcNodeId=" + routineInfo.srcNodeId + ']');
+            else {
+                if (log.isDebugEnabled()) {
+                    log.debug("Do not register continuous routine, rejected by 
node filter [" +
+                        "routineId=" + routineInfo.routineId +
+                        ", srcNodeId=" + routineInfo.srcNodeId + ']');
+                }
             }
-        }
+        }));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e9bb1c0/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PContinuousDeploymentSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PContinuousDeploymentSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PContinuousDeploymentSelfTest.java
index 4ea9037..28dab3f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PContinuousDeploymentSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PContinuousDeploymentSelfTest.java
@@ -17,9 +17,14 @@
 
 package org.apache.ignite.p2p;
 
+import java.util.UUID;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -49,6 +54,12 @@ public class GridP2PContinuousDeploymentSelfTest extends 
GridCommonAbstractTest
     /** Second test task name. */
     private static final String TEST_TASK_2 = 
"org.apache.ignite.tests.p2p.GridP2PContinuousDeploymentTask2";
 
+    /** Test predicate. */
+    private static final String TEST_PREDICATE = 
"org.apache.ignite.tests.p2p.GridEventConsumeFilter";
+
+    /** Client mode. */
+    private boolean clientMode;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -66,6 +77,11 @@ public class GridP2PContinuousDeploymentSelfTest extends 
GridCommonAbstractTest
 
         cfg.setDiscoverySpi(disco);
 
+        cfg.setPeerClassLoadingEnabled(true);
+
+        if (clientMode)
+            cfg.setClientMode(true);
+
         return cfg;
     }
 
@@ -85,8 +101,13 @@ public class GridP2PContinuousDeploymentSelfTest extends 
GridCommonAbstractTest
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        startGridsMultiThreaded(GRID_CNT);
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
     }
 
     /**
@@ -94,6 +115,8 @@ public class GridP2PContinuousDeploymentSelfTest extends 
GridCommonAbstractTest
      */
     @SuppressWarnings("unchecked")
     public void testDeployment() throws Exception {
+        startGridsMultiThreaded(GRID_CNT);
+
         Ignite ignite = startGrid(IGNITE_INSTANCE_NAME);
 
         Class cls = getExternalClassLoader().loadClass(TEST_TASK_1);
@@ -110,4 +133,47 @@ public class GridP2PContinuousDeploymentSelfTest extends 
GridCommonAbstractTest
 
         stopGrid(IGNITE_INSTANCE_NAME);
     }
+
+    /**
+     * Tests that server node joins correctly to existing cluster if it has 
deployed user class with enabled P2P.
+     *
+     * @throws Exception If failed.
+     */
+    public void testServerJoinWithP2PClassDeployedInCluster() throws Exception 
{
+        startGrids(GRID_CNT);
+
+        ClassLoader extLdr = getExternalClassLoader();
+
+        clientMode = true;
+
+        Ignite client = startGrid(2);
+
+        Class<?> cls = extLdr.loadClass(TEST_PREDICATE);
+
+        client.events().remoteListen(
+            new IgniteBiPredicate<UUID, Event>() {
+                @Override public boolean apply(UUID uuid, Event event) {
+                    return true;
+                }
+            },
+            (IgnitePredicate<Event>) cls.newInstance(),
+            EventType.EVT_CACHE_OBJECT_PUT
+        );
+
+        clientMode = false;
+
+        Ignite srv = startGrid(3);
+
+        srv.events().remoteListen(
+            new IgniteBiPredicate<UUID, Event>() {
+                @Override public boolean apply(UUID uuid, Event event) {
+                    return true;
+                }
+            },
+            (IgnitePredicate<Event>) cls.newInstance(),
+            EventType.EVT_CACHE_OBJECT_PUT
+        );
+
+        awaitPartitionMapExchange();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e9bb1c0/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java
index ddb003b..012366f 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java
@@ -37,6 +37,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.replicated.Ignite
 import 
org.apache.ignite.internal.processors.cache.multijvm.GridCacheAtomicMultiJvmFullApiSelfTest;
 import 
org.apache.ignite.internal.processors.cache.multijvm.GridCachePartitionedMultiJvmFullApiSelfTest;
 import 
org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest;
+import org.apache.ignite.p2p.GridP2PContinuousDeploymentSelfTest;
 import org.apache.ignite.util.GridCommandHandlerTest;
 
 /**
@@ -91,6 +92,8 @@ public class ZookeeperDiscoverySpiTestSuite2 extends 
ZookeeperDiscoverySpiAbstra
 
         suite.addTestSuite(GridCommandHandlerTest.class);
 
+        suite.addTestSuite(GridP2PContinuousDeploymentSelfTest.class);
+
         return suite;
     }
 }

Reply via email to