Repository: ignite Updated Branches: refs/heads/ignite-2683 [created] 7fc3da3a6
IGNITE-2683 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7ee89347 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7ee89347 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7ee89347 Branch: refs/heads/ignite-2683 Commit: 7ee8934709cca19843b581bf95eb001150284bf4 Parents: 2bb4499 Author: Anton Vinogradov <[email protected]> Authored: Thu Feb 18 16:30:54 2016 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Thu Feb 18 16:30:54 2016 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 3 +- .../processors/cache/GridCacheProcessor.java | 2 +- .../ignite/internal/util/IgniteUtils.java | 22 ++++++- .../ignite/spi/discovery/tcp/ClientImpl.java | 4 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 8 +-- .../TcpDiscoveryCustomEventMessage.java | 2 +- .../GridCacheReplicatedPreloadSelfTest.java | 63 ++++++++++++++++++ .../CacheDeploymentExternalizableTestValue.java | 68 ++++++++++++++++++++ 8 files changed, 160 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7ee89347/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 42f8dae..2335262 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1989,7 +1989,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return; if (msgBody == null) { - msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null); + msgBody = marsh.unmarshal(ioMsg.bodyBytes(), + U.resolveClassLoader(dep != null ? dep.classLoader() : null, ctx.config())); ioMsg.body(msgBody); // Save body to avoid future unmarshallings. } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ee89347/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index d485d41..c5da94d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -3443,7 +3443,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } try { - return marshaller.unmarshal(marshaller.marshal(val), U.resolveClassLoader(ctx.config().getClassLoader())); + return marshaller.unmarshal(marshaller.marshal(val), U.resolveClassLoader(ctx.config())); } catch (IgniteCheckedException e) { throw new IgniteCheckedException("Failed to validate cache configuration " + http://git-wip-us.apache.org/repos/asf/ignite/blob/7ee89347/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index a6b28fd..0822e64 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -2190,10 +2190,26 @@ public abstract class IgniteUtils { } /** - * @return Class loader passed as an argument or classloader used to load Ignite itself in case argument is null. + * @return ClassLoader at IgniteConfiguration in case it is not null or + * ClassLoader used to start Ignite. */ - public static ClassLoader resolveClassLoader(ClassLoader ldr) { - return ldr != null ? ldr : gridClassLoader; + public static ClassLoader resolveClassLoader(IgniteConfiguration cfg) { + return resolveClassLoader(null, cfg); + } + + /** + * @return ClassLoader passed as param in case it is not null or + * ClassLoader at IgniteConfiguration in case it is not null or + * ClassLoader used to start Ignite. + */ + public static ClassLoader resolveClassLoader(ClassLoader ldr, IgniteConfiguration cfg) { + assert cfg != null; + + return ldr != null ? + ldr : + cfg.getClassLoader() != null ? + cfg.getClassLoader() : + gridClassLoader; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7ee89347/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index b12f7a6..2b00a02 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1666,7 +1666,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (dataMap != null) { for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), - U.resolveClassLoader(spi.ignite().configuration().getClassLoader())); + U.resolveClassLoader(spi.ignite().configuration())); } locNode.setAttributes(msg.clientNodeAttributes()); @@ -1963,7 +1963,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (node != null && node.visible()) { try { DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh, - spi.ignite().configuration().getClassLoader()); + U.resolveClassLoader(spi.ignite().configuration())); notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ee89347/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index fa0ae1c..3c8d6c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3601,7 +3601,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (data != null) spi.onExchange(node.id(), node.id(), data, - U.resolveClassLoader(spi.ignite().configuration().getClassLoader())); + U.resolveClassLoader(spi.ignite().configuration())); msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id())); @@ -3681,7 +3681,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (dataMap != null) { for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) spi.onExchange(node.id(), entry.getKey(), entry.getValue(), - U.resolveClassLoader(spi.ignite().configuration().getClassLoader())); + U.resolveClassLoader(spi.ignite().configuration())); } processMessageFailedNodes(msg); @@ -4608,7 +4608,7 @@ class ServerImpl extends TcpDiscoveryImpl { DiscoverySpiCustomMessage msgObj = null; try { - msgObj = msg.message(spi.marsh, spi.ignite().configuration().getClassLoader()); + msgObj = msg.message(spi.marsh, U.resolveClassLoader(spi.ignite().configuration())); } catch (Throwable e) { U.error(log, "Failed to unmarshal discovery custom message.", e); @@ -4735,7 +4735,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (node != null) { try { DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh, - spi.ignite().configuration().getClassLoader()); + U.resolveClassLoader(spi.ignite().configuration())); lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, msg.topologyVersion(), http://git-wip-us.apache.org/repos/asf/ignite/blob/7ee89347/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index 9064080..2c759a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -77,7 +77,7 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage */ @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr) throws Throwable { if (msg == null) { - msg = marsh.unmarshal(msgBytes, U.resolveClassLoader(ldr)); + msg = marsh.unmarshal(msgBytes, ldr); assert msg != null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ee89347/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java index 1fae875..cf8c867 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java @@ -26,12 +26,14 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.Factory; import javax.cache.configuration.MutableCacheEntryListenerConfiguration; import javax.cache.event.CacheEntryListener; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteMessaging; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.CacheRebalanceMode; @@ -47,6 +49,7 @@ import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.CachePluginConfiguration; @@ -86,9 +89,14 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { /** */ private volatile boolean useExtClassLoader = false; + private volatile boolean disableP2p = false; + /** */ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + /** */ + private static volatile CountDownLatch latch; + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); @@ -118,6 +126,9 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { cfg.setEventStorageSpi(spi); + if (disableP2p) + cfg.setPeerClassLoadingEnabled(false); + if (getTestGridName(1).equals(gridName) || useExtClassLoader || cfg.getMarshaller() instanceof BinaryMarshaller) cfg.setClassLoader(getExternalClassLoader()); @@ -474,6 +485,40 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { /** * @throws Exception If test failed. */ + public void testExternalClassesAtMessage() throws Exception { + try { + useExtClassLoader = true; + disableP2p = true; + + final Class cls = (Class)getExternalClassLoader(). + loadClass("org.apache.ignite.tests.p2p.CacheDeploymentExternalizableTestValue"); + + Ignite g1 = startGrid(1); + startGrid(2); + + IgniteMessaging rmtMsg = g1.message(); + + latch = new CountDownLatch(2); + + rmtMsg.remoteListen("MyOrderedTopic", new Listener()); + + Object o = cls.newInstance(); + + o.toString(); + + rmtMsg.send("MyOrderedTopic", o); + + latch.await(); + } + finally { + useExtClassLoader = false; + disableP2p = false; + } + } + + /** + * @throws Exception If test failed. + */ public void testSync() throws Exception { preloadMode = SYNC; batchSize = 512; @@ -773,4 +818,22 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { // No-op. } } + + /** + * + */ + private static class Listener implements P2<UUID, Object> { + /** + * @param nodeId + * @param msg + * @return + */ + @Override public boolean apply(UUID nodeId, Object msg) { + System.out.println("Received message [msg=" + msg + ", from=" + nodeId + ']'); + + latch.countDown(); + + return true; // Return true to continue listening. + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ee89347/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentExternalizableTestValue.java ---------------------------------------------------------------------- diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentExternalizableTestValue.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentExternalizableTestValue.java new file mode 100644 index 0000000..573ad61 --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentExternalizableTestValue.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; + +/** + * Test value for deployment. + */ +public class CacheDeploymentExternalizableTestValue implements Serializable { + /** */ + private CacheDeploymentExternalizableTestValue2 field; + + /** + * @return + */ + public CacheDeploymentExternalizableTestValue2 getField() { + return field; + } + + /** + * @param field + */ + public void setField( + CacheDeploymentExternalizableTestValue2 field) { + this.field = field; + } + + @Override public String toString() { + field = new CacheDeploymentExternalizableTestValue2(); + + return super.toString(); + } + + /** + * + */ + public static class CacheDeploymentExternalizableTestValue2 implements Externalizable { + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + + } + } +} \ No newline at end of file
