1523
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/28a5247d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/28a5247d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/28a5247d Branch: refs/heads/ignite-1786 Commit: 28a5247d37ba65bdcb6119f97023f32d511b5f1e Parents: 0113506 Author: Anton Vinogradov <[email protected]> Authored: Tue Feb 2 15:37:12 2016 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Tue Feb 2 15:37:12 2016 +0300 ---------------------------------------------------------------------- .../internal/binary/BinaryEnumObjectImpl.java | 2 +- .../processors/cache/GridCacheProcessor.java | 2 +- .../ignite/internal/util/IgniteUtils.java | 7 ++ .../ignite/spi/discovery/tcp/ClientImpl.java | 6 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 11 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 1 - .../TcpDiscoveryCustomEventMessage.java | 10 +- .../GridCacheReplicatedPreloadSelfTest.java | 108 ++++++++++++++++++ .../tests/p2p/CacheDeploymentTestEnumValue.java | 47 ++++++++ .../p2p/CacheDeploymentTestStoreFactory.java | 113 +++++++++++++++++++ 10 files changed, 297 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/28a5247d/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java index fd4a4d8..ab76b6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java @@ -110,7 +110,7 @@ public class BinaryEnumObjectImpl implements BinaryObjectEx, Externalizable, Cac /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <T> T deserialize() throws BinaryObjectException { - Class cls = BinaryUtils.resolveClass(ctx, typeId, clsName, null, true); + Class cls = BinaryUtils.resolveClass(ctx, typeId, clsName, ctx.configuration().getClassLoader(), true); return BinaryEnumCache.get(cls, ord); } http://git-wip-us.apache.org/repos/asf/ignite/blob/28a5247d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 13048ec..26d3d4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -3421,7 +3421,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } try { - return marshaller.unmarshal(marshaller.marshal(val), val.getClass().getClassLoader()); + return marshaller.unmarshal(marshaller.marshal(val), U.resolveClassLoader(ctx.config().getClassLoader())); } catch (IgniteCheckedException e) { throw new IgniteCheckedException("Failed to validate cache configuration " + http://git-wip-us.apache.org/repos/asf/ignite/blob/28a5247d/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 0bf937d..6c0b8e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -2193,6 +2193,13 @@ public abstract class IgniteUtils { } /** + * @return Class loader passed as an argument or classloader used to load Ignite itself in case argument is null. + */ + public static ClassLoader resolveClassLoader(ClassLoader ldr) { + return ldr != null ? ldr : gridClassLoader; + } + + /** * @param parent Parent to find. * @param ldr Loader to check. * @return {@code True} if parent found. http://git-wip-us.apache.org/repos/asf/ignite/blob/28a5247d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 850cc24..b12f7a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1665,7 +1665,8 @@ class ClientImpl extends TcpDiscoveryImpl { if (dataMap != null) { for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) - spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), null); + spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), + U.resolveClassLoader(spi.ignite().configuration().getClassLoader())); } locNode.setAttributes(msg.clientNodeAttributes()); @@ -1961,7 +1962,8 @@ class ClientImpl extends TcpDiscoveryImpl { if (node != null && node.visible()) { try { - DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh); + DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh, + spi.ignite().configuration().getClassLoader()); notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj); } http://git-wip-us.apache.org/repos/asf/ignite/blob/28a5247d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index c69a611..0106b0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3600,7 +3600,8 @@ class ServerImpl extends TcpDiscoveryImpl { Map<Integer, byte[]> data = msg.newNodeDiscoveryData(); if (data != null) - spi.onExchange(node.id(), node.id(), data, U.gridClassLoader()); + spi.onExchange(node.id(), node.id(), data, + U.resolveClassLoader(spi.ignite().configuration().getClassLoader())); msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id())); @@ -3679,7 +3680,8 @@ class ServerImpl extends TcpDiscoveryImpl { // Notify outside of synchronized block. if (dataMap != null) { for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) - spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader()); + spi.onExchange(node.id(), entry.getKey(), entry.getValue(), + U.resolveClassLoader(spi.ignite().configuration().getClassLoader())); } processMessageFailedNodes(msg); @@ -4604,7 +4606,7 @@ class ServerImpl extends TcpDiscoveryImpl { DiscoverySpiCustomMessage msgObj = null; try { - msgObj = msg.message(spi.marsh); + msgObj = msg.message(spi.marsh, spi.ignite().configuration().getClassLoader()); } catch (Throwable e) { U.error(log, "Failed to unmarshal discovery custom message.", e); @@ -4727,7 +4729,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (node != null) { try { - DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh); + DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh, + spi.ignite().configuration().getClassLoader()); lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, msg.topologyVersion(), http://git-wip-us.apache.org/repos/asf/ignite/blob/28a5247d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index f9273c3..0d41cd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1700,7 +1700,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * @param joiningNodeID Joining node ID. * @param nodeId Remote node ID for which data is provided. * @param data Collection of marshalled discovery data objects from different components. - * @param clsLdr Class loader for discovery data unmarshalling. */ protected void onExchange(UUID joiningNodeID, UUID nodeId, http://git-wip-us.apache.org/repos/asf/ignite/blob/28a5247d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index 8f14459..e10de46 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -74,8 +74,16 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage * @throws java.lang.Throwable if unmarshal failed. */ @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh) throws Throwable { + return message(marsh, null); + } + + /** + * @return Deserialized message, + * @throws java.lang.Throwable if unmarshal failed. + */ + @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr) throws Throwable { if (msg == null) { - msg = marsh.unmarshal(msgBytes, U.gridClassLoader()); + msg = marsh.unmarshal(msgBytes, U.resolveClassLoader(ldr)); assert msg != null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/28a5247d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java index 5649b34..523f641 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; +import javax.cache.configuration.Factory; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CachePeekMode; @@ -70,6 +71,12 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { private int poolSize = 2; /** */ + private volatile boolean needStore = false; + + /** */ + private volatile boolean isClient = false; + + /** */ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** {@inheritDoc} */ @@ -104,6 +111,9 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { if (getTestGridName(1).equals(gridName) || cfg.getMarshaller() instanceof BinaryMarshaller) cfg.setClassLoader(getExternalClassLoader()); + if (isClient) + cfg.setClientMode(true); + return cfg; } @@ -122,6 +132,20 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { cacheCfg.setRebalanceBatchSize(batchSize); cacheCfg.setRebalanceThreadPoolSize(poolSize); + if (needStore) { + Object sf = null; + + try { + sf = getExternalClassLoader(). + loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestStoreFactory").newInstance(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + cacheCfg.setCacheStoreFactory((Factory)sf); + } + return cacheCfg; } @@ -233,6 +257,18 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { assert v2.getClass().getClassLoader().getClass().getName().contains("GridDeploymentClassLoader") || grid(2).configuration().getMarshaller() instanceof BinaryMarshaller; + Object e1 = ldr.loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestEnumValue").getEnumConstants()[0]; + + cache1.put(2, e1); + + Object e2 = cache2.get(2); + + assert e2 != null; + assert e2.toString().equals(e1.toString()); + assert !e2.getClass().getClassLoader().equals(getClass().getClassLoader()); + assert e2.getClass().getClassLoader().getClass().getName().contains("GridDeploymentClassLoader") || + grid(2).configuration().getMarshaller() instanceof BinaryMarshaller; + stopGrid(1); Ignite g3 = startGrid(3); @@ -259,6 +295,78 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { /** * @throws Exception If test failed. */ + public void testStore() throws Exception { + try { + Ignite g1 = startGrid(1); + + if (g1.configuration().getMarshaller() instanceof BinaryMarshaller) { + stopAllGrids(); + + needStore = true; + + g1 = startGrid(1); + + ClassLoader ldr = grid(1).configuration().getClassLoader(); + + CacheConfiguration cfg = defaultCacheConfiguration(); + + Ignite g2 = startGrid(2); // Checks deserialization at node join. + + isClient = true; + + Ignite g3 = startGrid(3); + + isClient = false; + + IgniteCache<Integer, Object> cache1 = g1.cache(null); + IgniteCache<Integer, Object> cache2 = g2.cache(null); + IgniteCache<Integer, Object> cache3 = g3.cache(null); + + cache1.put(1, 1); + + assertEquals(1, cache2.get(1)); + assertEquals(1, cache3.get(1)); + + needStore = false; + + stopAllGrids(); + + g1 = startGrid(1); + g2 = startGrid(2); + + isClient = true; + + g3 = startGrid(3); + + isClient = false; + + Object sf = ldr.loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestStoreFactory").newInstance(); + + cfg.setCacheStoreFactory((Factory)sf); + cfg.setName("customStore"); + + cache1 = g1.createCache(cfg); + + cache2 = g2.getOrCreateCache(cfg); // Checks deserialization at cache creation. + cache3 = g3.getOrCreateCache(cfg); // Checks deserialization at cache creation. + + cache1.put(1, 1); + + assertEquals(1, cache2.get(1)); + assertEquals(1, cache3.get(1)); + } + } + finally { + needStore = false; + isClient = false; + + stopAllGrids(); + } + } + + /** + * @throws Exception If test failed. + */ public void testSync() throws Exception { preloadMode = SYNC; batchSize = 512; http://git-wip-us.apache.org/repos/asf/ignite/blob/28a5247d/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentTestEnumValue.java ---------------------------------------------------------------------- diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentTestEnumValue.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentTestEnumValue.java new file mode 100644 index 0000000..cc70e2d --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentTestEnumValue.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p; + +/** + * Test enum for cache deployment tests. + */ +public enum CacheDeploymentTestEnumValue { + ONE("one"), + TWO("two"), + THREE("three"); + + /** */ + private String value; + + /** */ + CacheDeploymentTestEnumValue(String value) { + this.value = value; + } + + /** */ + public String getValue() { + return value; + } + + /** */ + @Override public String toString() { + return "CacheDeploymentTestEnumValue{" + + "value='" + value + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/28a5247d/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentTestStoreFactory.java ---------------------------------------------------------------------- diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentTestStoreFactory.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentTestStoreFactory.java new file mode 100644 index 0000000..1a0fc17 --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentTestStoreFactory.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.internal.processors.cache.store.CacheLocalStore; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.Nullable; + +/** + * Test store factory for cache deployment tests. + */ +public class CacheDeploymentTestStoreFactory implements Factory<CacheStore<Integer, String>>, Serializable { + /** {@inheritDoc} */ + @Override public CacheStore<Integer, String> create() { + return new TestLocalStore(); + } + + /** + * + */ + @CacheLocalStore + public static class TestLocalStore<K, V> implements CacheStore<K, IgniteBiTuple<V, ?>>, Serializable { + /** */ + private Map<K, IgniteBiTuple<V, ?>> map = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<K, IgniteBiTuple<V, ?>> clo, @Nullable Object... args) + throws CacheLoaderException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(boolean commit) throws CacheWriterException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteBiTuple<V, ?> load(K key) throws CacheLoaderException { + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public Map<K, IgniteBiTuple<V, ?>> loadAll(Iterable<? extends K> keys) throws CacheLoaderException { + Map<K, IgniteBiTuple<V, ?>> res = new HashMap<>(); + + for (K key : keys) { + IgniteBiTuple<V, ?> val = map.get(key); + + if (val != null) + res.put(key, val); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends K, ? extends IgniteBiTuple<V, ?>> entry) + throws CacheWriterException { + map.put(entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends IgniteBiTuple<V, ?>>> entries) + throws CacheWriterException { + for (Cache.Entry<? extends K, ? extends IgniteBiTuple<V, ?>> e : entries) + map.put(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + map.remove(key); + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) throws CacheWriterException { + for (Object key : keys) + map.remove(key); + } + + /** + * Clear store. + */ + public void clear() { + map.clear(); + } + } +} \ No newline at end of file
