Repository: ignite Updated Branches: refs/heads/ignite-3220-1 ea2b19fd6 -> 43b64c06e
IGNITE-3618: Client can not load data after server restarts. This closes #941. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d0cbb45 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d0cbb45 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d0cbb45 Branch: refs/heads/ignite-3220-1 Commit: 1d0cbb45cd61c5c8e6ec926d7e629eb94111b32f Parents: ff3e00c Author: vd-pyatkov <vpyat...@gridgain.com> Authored: Thu Aug 11 08:43:50 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Thu Aug 11 08:43:50 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/binary/BinaryContext.java | 7 + .../binary/CacheObjectBinaryProcessorImpl.java | 19 ++ .../ClientReconnectAfterClusterRestartTest.java | 225 +++++++++++++++++++ .../IgniteCacheWithIndexingTestSuite.java | 5 +- 4 files changed, 255 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1d0cbb45/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java index d78c126..a603894 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java @@ -1252,6 +1252,13 @@ public class BinaryContext { } /** + * Unregister all binary schemas. + */ + public void unregisterBinarySchemas() { + schemas = null; + } + + /** * Returns instance of {@link OptimizedMarshaller}. * * @return Optimized marshaller. http://git-wip-us.apache.org/repos/asf/ignite/blob/1d0cbb45/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 6d980a8..0337874 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -50,6 +50,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.BinaryConfiguration; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.binary.BinaryContext; @@ -67,6 +68,7 @@ import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl; import org.apache.ignite.internal.binary.streams.BinaryInputStream; import org.apache.ignite.internal.binary.streams.BinaryOffheapInputStream; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapter; @@ -103,6 +105,7 @@ import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; import static org.apache.ignite.IgniteSystemProperties.getBoolean; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; /** * Binary processor implementation. @@ -146,6 +149,13 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm @GridToStringExclude private IgniteBinary binaries; + /** Listener removes all registred binary schemas after the local client reconnected. */ + private final GridLocalEventListener clientDisconLsnr = new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + binaryContext().unregisterBinarySchemas(); + } + }; + /** Metadata updates collected before metadata cache is initialized. */ private final Map<Integer, BinaryMetadata> metaBuf = new ConcurrentHashMap<>(); @@ -165,6 +175,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { if (marsh instanceof BinaryMarshaller) { + if (ctx.clientNode()) + ctx.event().addLocalEventListener(clientDisconLsnr, EVT_CLIENT_NODE_DISCONNECTED); + BinaryMetadataHandler metaHnd = new BinaryMetadataHandler() { @Override public void addMeta(int typeId, BinaryType newMeta) throws BinaryObjectException { assert newMeta != null; @@ -252,6 +265,12 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm } /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + if (ctx.clientNode()) + ctx.event().removeLocalEventListener(clientDisconLsnr); + } + + /** {@inheritDoc} */ @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException { if (clientNode && !ctx.isDaemon()) { ctx.continuous().registerStaticRoutine( http://git-wip-us.apache.org/repos/asf/ignite/blob/1d0cbb45/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java new file mode 100644 index 0000000..b31447c --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; + +/** + */ +public class ClientReconnectAfterClusterRestartTest extends GridCommonAbstractTest { + /** Client id. */ + public static final int CLIENT_ID = 1; + + /** Cache params. */ + public static final String CACHE_PARAMS = "PPRB_PARAMS"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new BinaryMarshaller()); + cfg.setIncludeEventTypes(EventType.EVTS_CACHE); + + if (getTestGridName(CLIENT_ID).equals(gridName)) + cfg.setClientMode(true); + else { + CacheConfiguration ccfg = getCacheConfiguration(); + + cfg.setCacheConfiguration(ccfg); + } + + return cfg; + } + + /** + * @return CacheConfiguration Cache configuration. + */ + @NotNull private CacheConfiguration getCacheConfiguration() { + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setName(CACHE_PARAMS); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setCacheMode(CacheMode.PARTITIONED); + ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); + + List<QueryEntity> queryEntities = new ArrayList<>(); + + QueryEntity entity = new QueryEntity(); + + entity.setValueType("Params"); + entity.setKeyType("java.lang.Long"); + + LinkedHashMap<String, String> fields = new LinkedHashMap<>(); + + fields.put("ID", "java.lang.Long" ); + fields.put("PARTITIONID", "java.lang.Long"); + fields.put("CLIENTID", "java.lang.Long"); + fields.put("PARAMETRCODE", "java.lang.Long"); + fields.put("PARAMETRVALUE", "java.lang.Object"); + fields.put("PARENTID", "java.lang.Long"); + + entity.setFields(fields); + + List<QueryIndex> indexes = new ArrayList<>(); + + indexes.add(new QueryIndex("CLIENTID")); + indexes.add(new QueryIndex("ID")); + indexes.add(new QueryIndex("PARENTID")); + + entity.setIndexes(indexes); + + queryEntities.add(entity); + + ccfg.setQueryEntities(queryEntities); + return ccfg; + } + + /** */ + public void testReconnectClient() throws Exception { + try { + Ignite igniteSrv = startGrid(0); + + Ignite client = startGrid(1); + + checkTopology(2); + + client.events().localListen(new IgnitePredicate<Event>() { + + @Override public boolean apply(Event event) { + switch (event.type()) { + case EventType.EVT_CLIENT_NODE_DISCONNECTED: + info("Client disconnected"); + + break; + case EventType.EVT_CLIENT_NODE_RECONNECTED: + info("Client reconnected"); + } + + return true; + } + }, EventType.EVT_CLIENT_NODE_DISCONNECTED, EventType.EVT_CLIENT_NODE_RECONNECTED); + + IgniteDataStreamer<Long, BinaryObject> streamer = client.dataStreamer(CACHE_PARAMS); + + streamer.allowOverwrite(true); + streamer.keepBinary(true); + streamer.perNodeBufferSize(10000); + streamer.perNodeParallelOperations(100); + + BinaryObjectBuilder builder = client.binary().builder("PARAMS"); + + builder.setField("ID", 1L); + builder.setField("PARTITIONID", 1L); + builder.setField("CLIENTID", 1L); + builder.setField("PARAMETRCODE", 1L); + builder.setField("PARAMETRVALUE", "Test value"); + builder.setField("PARENTID", 1L); + BinaryObject obj = builder.build(); + + streamer.addData(1L, obj); + streamer.flush(); + + stopAllServers(false); + + Thread.sleep(2_000); + + igniteSrv = startGrid(0); + + Thread.sleep(2_000); + + checkTopology(2); + + info("Pre-insert"); + + streamer = client.dataStreamer("PPRB_PARAMS"); + streamer.allowOverwrite(true); + streamer.keepBinary(true); + streamer.perNodeBufferSize(10000); + streamer.perNodeParallelOperations(100); + + IgniteCache<Long, BinaryObject> cache = client.getOrCreateCache(CACHE_PARAMS).withKeepBinary(); + + builder = client.binary().builder("PARAMS"); + builder.setField("ID", 2L); + builder.setField("PARTITIONID", 1L); + builder.setField("CLIENTID", 1L); + builder.setField("PARAMETRCODE", 1L); + builder.setField("PARAMETRVALUE", "Test value"); + builder.setField("PARENTID", 1L); + obj = builder.build(); + + //streamer.addData(2L, obj); + cache.put(2L, obj); + + builder = client.binary().builder("PARAMS"); + builder.setField("ID", 3L); + builder.setField("PARTITIONID", 1L); + builder.setField("CLIENTID", 1L); + builder.setField("PARAMETRCODE", 1L); + builder.setField("PARAMETRVALUE", "Test value"); + builder.setField("PARENTID", 1L); + obj = builder.build(); + + //streamer.addData(3L, obj); + cache.put(3L, obj); + + builder = client.binary().builder("PARAMS"); + builder.setField("ID", 4L); + builder.setField("PARTITIONID", 1L); + builder.setField("CLIENTID", 1L); + builder.setField("PARAMETRCODE", 1L); + builder.setField("PARAMETRVALUE", "Test value"); + builder.setField("PARENTID", 1L); + obj = builder.build(); + + cache.put(4L, obj); + + info("Post-insert"); + + obj = cache.get(4L); + + assertNotNull(obj); + + info("End"); + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1d0cbb45/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java index 96e8551..4528b30 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest; import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest; import org.apache.ignite.internal.processors.cache.CacheOperationsWithExpirationTest; import org.apache.ignite.internal.processors.cache.CacheRandomOperationsMultithreadedTest; +import org.apache.ignite.internal.processors.cache.ClientReconnectAfterClusterRestartTest; import org.apache.ignite.internal.processors.cache.GridCacheOffHeapAndSwapSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheOffHeapSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexEntryEvictTest; @@ -79,6 +80,8 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite { suite.addTestSuite(CacheOperationsWithExpirationTest.class); suite.addTestSuite(CacheBinaryKeyConcurrentQueryTest.class); + suite.addTestSuite(ClientReconnectAfterClusterRestartTest.class); + return suite; } -} \ No newline at end of file +}