IGNITE-9129 P2P class deployment with ZK discovery fixed. Signed-off-by: agura <ag...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e9bb1c0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e9bb1c0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e9bb1c0 Branch: refs/heads/ignite-8446 Commit: 0e9bb1c0e50ed8a29ccd3d6a9e2bd5311b6b7912 Parents: 0b39405 Author: Pavel Kovalenko <jokse...@gmail.com> Authored: Thu Aug 2 16:18:44 2018 +0300 Committer: agura <ag...@apache.org> Committed: Thu Aug 2 16:19:24 2018 +0300 ---------------------------------------------------------------------- .../continuous/GridContinuousProcessor.java | 72 +++++++++++--------- .../GridP2PContinuousDeploymentSelfTest.java | 70 ++++++++++++++++++- .../zk/ZookeeperDiscoverySpiTestSuite2.java | 3 + 3 files changed, 109 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0e9bb1c0/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 2d48b7d..6723ea4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -544,7 +544,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param routineInfo Routine info. */ private void startDiscoveryDataRoutine(ContinuousRoutineInfo routineInfo) { - IgnitePredicate<ClusterNode> nodeFilter = null; + IgnitePredicate<ClusterNode> nodeFilter; try { if (routineInfo.nodeFilter != null) { @@ -552,6 +552,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.resource().injectGeneric(nodeFilter); } + else + nodeFilter = null; } catch (IgniteCheckedException e) { U.error(log, "Failed to unmarshal continuous routine filter, ignore routine [" + @@ -561,45 +563,47 @@ public class GridContinuousProcessor extends GridProcessorAdapter { return; } - if (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())) { - GridContinuousHandler hnd; + ctx.discovery().localJoinFuture().listen(f -> ctx.closure().runLocalSafe(() -> { + if (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())) { + GridContinuousHandler hnd; - try { - hnd = U.unmarshal(marsh, routineInfo.hnd, U.resolveClassLoader(ctx.config())); + try { + hnd = U.unmarshal(marsh, routineInfo.hnd, U.resolveClassLoader(ctx.config())); - if (ctx.config().isPeerClassLoadingEnabled()) - hnd.p2pUnmarshal(routineInfo.srcNodeId, ctx); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal continuous routine handler, ignore routine [" + - "routineId=" + routineInfo.routineId + - ", srcNodeId=" + routineInfo.srcNodeId + ']', e); + if (ctx.config().isPeerClassLoadingEnabled()) + hnd.p2pUnmarshal(routineInfo.srcNodeId, ctx); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal continuous routine handler, ignore routine [" + + "routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']', e); - return; - } + return; + } - try { - registerHandler(routineInfo.srcNodeId, - routineInfo.routineId, - hnd, - routineInfo.bufSize, - routineInfo.interval, - routineInfo.autoUnsubscribe, - false); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to register continuous routine handler, ignore routine [" + - "routineId=" + routineInfo.routineId + - ", srcNodeId=" + routineInfo.srcNodeId + ']', e); + try { + registerHandler(routineInfo.srcNodeId, + routineInfo.routineId, + hnd, + routineInfo.bufSize, + routineInfo.interval, + routineInfo.autoUnsubscribe, + false); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to register continuous routine handler, ignore routine [" + + "routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']', e); + } } - } - else { - if (log.isDebugEnabled()) { - log.debug("Do not register continuous routine, rejected by node filter [" + - "routineId=" + routineInfo.routineId + - ", srcNodeId=" + routineInfo.srcNodeId + ']'); + else { + if (log.isDebugEnabled()) { + log.debug("Do not register continuous routine, rejected by node filter [" + + "routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']'); + } } - } + })); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0e9bb1c0/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PContinuousDeploymentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PContinuousDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PContinuousDeploymentSelfTest.java index 4ea9037..28dab3f 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PContinuousDeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PContinuousDeploymentSelfTest.java @@ -17,9 +17,14 @@ package org.apache.ignite.p2p; +import java.util.UUID; import org.apache.ignite.Ignite; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -49,6 +54,12 @@ public class GridP2PContinuousDeploymentSelfTest extends GridCommonAbstractTest /** Second test task name. */ private static final String TEST_TASK_2 = "org.apache.ignite.tests.p2p.GridP2PContinuousDeploymentTask2"; + /** Test predicate. */ + private static final String TEST_PREDICATE = "org.apache.ignite.tests.p2p.GridEventConsumeFilter"; + + /** Client mode. */ + private boolean clientMode; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -66,6 +77,11 @@ public class GridP2PContinuousDeploymentSelfTest extends GridCommonAbstractTest cfg.setDiscoverySpi(disco); + cfg.setPeerClassLoadingEnabled(true); + + if (clientMode) + cfg.setClientMode(true); + return cfg; } @@ -85,8 +101,13 @@ public class GridP2PContinuousDeploymentSelfTest extends GridCommonAbstractTest } /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - startGridsMultiThreaded(GRID_CNT); + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); } /** @@ -94,6 +115,8 @@ public class GridP2PContinuousDeploymentSelfTest extends GridCommonAbstractTest */ @SuppressWarnings("unchecked") public void testDeployment() throws Exception { + startGridsMultiThreaded(GRID_CNT); + Ignite ignite = startGrid(IGNITE_INSTANCE_NAME); Class cls = getExternalClassLoader().loadClass(TEST_TASK_1); @@ -110,4 +133,47 @@ public class GridP2PContinuousDeploymentSelfTest extends GridCommonAbstractTest stopGrid(IGNITE_INSTANCE_NAME); } + + /** + * Tests that server node joins correctly to existing cluster if it has deployed user class with enabled P2P. + * + * @throws Exception If failed. + */ + public void testServerJoinWithP2PClassDeployedInCluster() throws Exception { + startGrids(GRID_CNT); + + ClassLoader extLdr = getExternalClassLoader(); + + clientMode = true; + + Ignite client = startGrid(2); + + Class<?> cls = extLdr.loadClass(TEST_PREDICATE); + + client.events().remoteListen( + new IgniteBiPredicate<UUID, Event>() { + @Override public boolean apply(UUID uuid, Event event) { + return true; + } + }, + (IgnitePredicate<Event>) cls.newInstance(), + EventType.EVT_CACHE_OBJECT_PUT + ); + + clientMode = false; + + Ignite srv = startGrid(3); + + srv.events().remoteListen( + new IgniteBiPredicate<UUID, Event>() { + @Override public boolean apply(UUID uuid, Event event) { + return true; + } + }, + (IgnitePredicate<Event>) cls.newInstance(), + EventType.EVT_CACHE_OBJECT_PUT + ); + + awaitPartitionMapExchange(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/0e9bb1c0/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java index ddb003b..012366f 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.Ignite import org.apache.ignite.internal.processors.cache.multijvm.GridCacheAtomicMultiJvmFullApiSelfTest; import org.apache.ignite.internal.processors.cache.multijvm.GridCachePartitionedMultiJvmFullApiSelfTest; import org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest; +import org.apache.ignite.p2p.GridP2PContinuousDeploymentSelfTest; import org.apache.ignite.util.GridCommandHandlerTest; /** @@ -91,6 +92,8 @@ public class ZookeeperDiscoverySpiTestSuite2 extends ZookeeperDiscoverySpiAbstra suite.addTestSuite(GridCommandHandlerTest.class); + suite.addTestSuite(GridP2PContinuousDeploymentSelfTest.class); + return suite; } }