IGNITE-2683
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1e59996 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1e59996 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1e59996 Branch: refs/heads/ignite-2683 Commit: c1e5999623021587ecd1f3e39f04ed037a5f1a59 Parents: 7ee8934 Author: Anton Vinogradov <[email protected]> Authored: Fri Feb 19 17:34:42 2016 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Fri Feb 19 17:34:42 2016 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 6 +- .../deployment/GridDeploymentLocalStore.java | 2 +- .../cache/GridCacheDeploymentManager.java | 5 + .../continuous/GridContinuousProcessor.java | 2 +- .../GridCacheReplicatedPreloadSelfTest.java | 125 ++++++++++++++++++- 5 files changed, 134 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c1e59996/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 2335262..36be9ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -585,7 +585,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (msg.topic() == null) { int topicOrd = msg.topicOrdinal(); - msg.topic(topicOrd >= 0 ? GridTopic.fromOrdinal(topicOrd) : marsh.unmarshal(msg.topicBytes(), null)); + msg.topic(topicOrd >= 0 ? GridTopic.fromOrdinal(topicOrd) : + marsh.unmarshal(msg.topicBytes(), U.resolveClassLoader(ctx.config()))); } if (!started) { @@ -1980,7 +1981,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa // Unmarshall message topic if needed. if (msgTopic == null && msgTopicBytes != null) { - msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null); + msgTopic = marsh.unmarshal(msgTopicBytes, + U.resolveClassLoader(dep != null ? dep.classLoader() : null, ctx.config())); ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings. } http://git-wip-us.apache.org/repos/asf/ignite/blob/c1e59996/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java index d095efb..ab45708 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java @@ -174,7 +174,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { // Safety. if (ldr == null) - ldr = U.gridClassLoader(); + ldr = U.resolveClassLoader(ctx.config()); } if (ldr instanceof GridDeploymentClassLoader) { http://git-wip-us.apache.org/repos/asf/ignite/blob/c1e59996/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java index 97d58dc..6cf6c22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java @@ -855,6 +855,11 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap } } + Class cls = super.loadClass(name); + + if (cls != null) + return cls; + throw new ClassNotFoundException("Failed to load class [name=" + name+ ", ctx=" + deps + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/c1e59996/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 5ba054a..9e684c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -287,7 +287,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (msg.data() == null && msg.dataBytes() != null) { try { - msg.data(marsh.unmarshal(msg.dataBytes(), null)); + msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config()))); } catch (IgniteCheckedException e) { U.error(log, "Failed to process message (ignoring): " + msg, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1e59996/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java index cf8c867..425cbc7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java @@ -44,6 +44,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventAdapter; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; @@ -64,9 +65,12 @@ import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS; +import static org.apache.ignite.events.EventType.EVTS_ALL; +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; + /** * Tests for replicated cache preloader. */ @@ -97,6 +101,9 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { /** */ private static volatile CountDownLatch latch; + /** */ + private static boolean cutromEvt = false; + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); @@ -136,6 +143,16 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { if (isClient) cfg.setClientMode(true); + if (cutromEvt) { + int[] evts = new int[EVTS_ALL.length + 1]; + + evts[0] = Integer.MAX_VALUE - 1; + + System.arraycopy(EVTS_ALL, 0, evts, 1, EVTS_ALL.length); + + cfg.setIncludeEventTypes(evts); + } + return cfg; } @@ -500,15 +517,33 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { latch = new CountDownLatch(2); - rmtMsg.remoteListen("MyOrderedTopic", new Listener()); + rmtMsg.remoteListen("MyOrderedTopic", new MessageListener()); Object o = cls.newInstance(); o.toString(); rmtMsg.send("MyOrderedTopic", o); + rmtMsg.sendOrdered("MyOrderedTopic", o, 0); + + latch.await(); + + // Custom topic. + + final Class cls2 = (Class)getExternalClassLoader(). + loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestEnumValue"); + + Object topic = cls2.getEnumConstants()[0]; + + latch = new CountDownLatch(2); + + rmtMsg.remoteListen(topic, new MessageListener()); + + rmtMsg.send(topic, topic); + rmtMsg.sendOrdered(topic, topic, 0); latch.await(); + } finally { useExtClassLoader = false; @@ -519,6 +554,82 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { /** * @throws Exception If test failed. */ + public void testExternalClassesAtEventP2pDisabled() throws Exception { + testExternalClassesAtEvent0(true); + + } + + /** + * @throws Exception If test failed. + */ + public void testExternalClassesAtEvent() throws Exception { + testExternalClassesAtEvent0(false); + } + + /** + * @throws Exception If test failed. + */ + private void testExternalClassesAtEvent0(boolean p2p) throws Exception { + try { + useExtClassLoader = true; + cutromEvt = true; + + if (p2p) + disableP2p = true; + + final Class cls = (Class)getExternalClassLoader(). + loadClass("org.apache.ignite.tests.p2p.CacheDeploymentExternalizableTestValue"); + final Class cls2 = (Class)getExternalClassLoader(). + loadClass("org.apache.ignite.tests.p2p.GridEventConsumeFilter"); + + Ignite g1 = startGrid(1); + startGrid(2); + + latch = new CountDownLatch(3); + + g1.events().localListen((IgnitePredicate)cls2.newInstance(), EVT_CACHE_OBJECT_PUT); + g1.events().localListen(new EventListener(), EVT_CACHE_OBJECT_PUT); + + g1.events().remoteListen(null, (IgnitePredicate)cls2.newInstance(), EVT_CACHE_OBJECT_PUT); + g1.events().remoteListen(null, new EventListener(), EVT_CACHE_OBJECT_PUT); + + g1.cache(null).put("1", cls.newInstance()); + + latch.await(); + + final int usrType = Integer.MAX_VALUE - 1; + + latch = new CountDownLatch(4); + + g1.events().localListen(new EventListener(), usrType); + g1.events().localListen((IgnitePredicate)cls2.newInstance(), usrType); + g1.events().remoteListen(null, new EventListener(), usrType); + g1.events().remoteListen(null, (IgnitePredicate)cls2.newInstance(), usrType); + + g1.events().recordLocal(new EventAdapter(null, "Test message.", usrType) { + Object o = cls.newInstance(); + // No-op. + }); + + Collection<Event> evts = g1.events().remoteQuery(new EventListener(), 0, usrType); + evts = g1.events().remoteQuery((IgnitePredicate)cls2.newInstance(), 0, usrType); + evts = g1.events().localQuery(new EventListener(), 0, usrType); + evts = g1.events().localQuery((IgnitePredicate)cls2.newInstance(), 0, usrType); + + latch.await(); + } + finally { + useExtClassLoader = false; + cutromEvt = false; + + if (p2p) + disableP2p = false; + } + } + + /** + * @throws Exception If test failed. + */ public void testSync() throws Exception { preloadMode = SYNC; batchSize = 512; @@ -822,7 +933,7 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { /** * */ - private static class Listener implements P2<UUID, Object> { + private static class MessageListener implements P2<UUID, Object> { /** * @param nodeId * @param msg @@ -836,4 +947,14 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { return true; // Return true to continue listening. } } + + private static class EventListener implements IgnitePredicate<Event> { + @Override public boolean apply(Event evt) { + System.out.println("Cache event: " + evt); + + latch.countDown(); + + return true; + } + } }
