ignite-1564 Fixed client cache reconnect issues
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/273f291d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/273f291d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/273f291d Branch: refs/heads/ignite-1.4.2 Commit: 273f291d9fac0919d57b9ed732564b323a956f90 Parents: cd43967 Author: sboikov <sboi...@gridgain.com> Authored: Wed Sep 30 13:48:48 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Sep 30 13:48:48 2015 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 22 +-- .../processors/cache/GridCacheEventManager.java | 12 +- .../processors/cache/GridCacheProcessor.java | 68 +++++--- .../dht/preloader/GridDhtPreloader.java | 6 + .../IgniteClientReconnectAbstractTest.java | 35 ++++- .../IgniteClientReconnectCacheTest.java | 154 +++++++++++++++++++ 6 files changed, 249 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 785613d..aec36a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -78,7 +78,7 @@ import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.F0; -import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; +import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -208,7 +208,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Topology cache history. */ private final Map<AffinityTopologyVersion, DiscoCache> discoCacheHist = - new GridBoundedConcurrentLinkedHashMap<>(DISCOVERY_HISTORY_SIZE, DISCOVERY_HISTORY_SIZE, 0.7f, 1); + new GridBoundedConcurrentOrderedMap<>(DISCOVERY_HISTORY_SIZE); /** Topology snapshots history. */ private volatile Map<Long, Collection<ClusterNode>> topHist = new HashMap<>(); @@ -465,14 +465,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { final Map<Long, Collection<ClusterNode>> snapshots, @Nullable DiscoverySpiCustomMessage spiCustomMsg ) { - if (type == EVT_NODE_JOINED && node.isLocal() && ctx.clientDisconnected()) { - discoCacheHist.clear(); - - topHist.clear(); - - topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, null)); - } - DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null : ((CustomMessageWrapper)spiCustomMsg).delegate(); @@ -593,6 +585,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { locJoinEvt = new GridFutureAdapter<>(); registeredCaches.clear(); + + discoCacheHist.clear(); + + topHist.clear(); + + topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, + new DiscoCache(locNode, Collections.<ClusterNode>emptySet()))); } else if (type == EVT_CLIENT_NODE_RECONNECTED) { assert locNode.isClient() : locNode; @@ -620,7 +619,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return; } - discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg); + if (type == EVT_CLIENT_NODE_DISCONNECTED || type == EVT_NODE_SEGMENTED || !ctx.clientDisconnected()) + discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java index c2f8f3f..751c316 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java @@ -40,14 +40,6 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED; * Cache event manager. */ public class GridCacheEventManager extends GridCacheManagerAdapter { - /** Local node ID. */ - private UUID locNodeId; - - /** {@inheritDoc} */ - @Override public void start0() { - locNodeId = cctx.localNodeId(); - } - /** * Adds local event listener. * @@ -96,7 +88,7 @@ public class GridCacheEventManager extends GridCacheManagerAdapter { { addEvent(part, key, - locNodeId, + cctx.localNodeId(), tx, owner, type, @@ -116,7 +108,7 @@ public class GridCacheEventManager extends GridCacheManagerAdapter { addEvent( 0, null, - locNodeId, + cctx.localNodeId(), (IgniteUuid)null, null, type, http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index c86dfd9..6c13399 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -26,6 +26,7 @@ import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.ListIterator; @@ -197,6 +198,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** */ private Map<String, DynamicCacheDescriptor> cachesOnDisconnect; + /** */ + private Map<UUID, DynamicCacheChangeBatch> clientReconnectReqs; + /** * @param ctx Kernal context. */ @@ -1050,6 +1054,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } + if (clientReconnectReqs != null) { + for (Map.Entry<UUID, DynamicCacheChangeBatch> e : clientReconnectReqs.entrySet()) + processClientReconnectData(e.getKey(), e.getValue()); + + clientReconnectReqs = null; + } + sharedCtx.onReconnected(); for (GridCacheAdapter cache : reconnected) @@ -1881,28 +1892,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data; if (batch.clientReconnect()) { - for (DynamicCacheChangeRequest req : batch.requests()) { - assert !req.template() : req; - - String name = req.cacheName(); - - boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name); + if (ctx.clientDisconnected()) { + if (clientReconnectReqs == null) + clientReconnectReqs = new LinkedHashMap<>(); - if (!sysCache) { - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); + clientReconnectReqs.put(joiningNodeId, batch); - if (desc != null && desc.deploymentId().equals(req.deploymentId())) { - Map<UUID, Boolean> nodes = batch.clientNodes().get(name); - - assert nodes != null : req; - assert nodes.containsKey(joiningNodeId) : nodes; - - ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, nodes.get(joiningNodeId)); - } - } - else - ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, false); + return; } + + processClientReconnectData(joiningNodeId, batch); } else { for (DynamicCacheChangeRequest req : batch.requests()) { @@ -1983,6 +1982,37 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param clientNodeId Client node ID. + * @param batch Cache change batch. + */ + private void processClientReconnectData(UUID clientNodeId, DynamicCacheChangeBatch batch) { + assert batch.clientReconnect() : batch; + + for (DynamicCacheChangeRequest req : batch.requests()) { + assert !req.template() : req; + + String name = req.cacheName(); + + boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name); + + if (!sysCache) { + DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); + + if (desc != null && desc.deploymentId().equals(req.deploymentId())) { + Map<UUID, Boolean> nodes = batch.clientNodes().get(name); + + assert nodes != null : req; + assert nodes.containsKey(clientNodeId) : nodes; + + ctx.discovery().addClientNode(req.cacheName(), clientNodeId, nodes.get(clientNodeId)); + } + } + else + ctx.discovery().addClientNode(req.cacheName(), clientNodeId, false); + } + } + + /** * Dynamically starts cache using template configuration. * * @param cacheName Cache name. http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 9d5fdca..19b461e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -260,6 +260,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public void onReconnected() { startFut = new GridFutureAdapter<>(); + + long topVer0 = cctx.kernalContext().discovery().topologyVersion(); + + assert topVer0 > 0 : topVer0; + + topVer.set(topVer0); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index 3a6d04f..0c1df7f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal; import java.io.IOException; import java.net.Socket; +import java.util.Collections; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -199,15 +201,28 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra */ protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedC) throws Exception { - final TestTcpDiscoverySpi clientSpi = spi(client); + reconnectClientNodes(Collections.singletonList(client), srv, disconnectedC); + } + + /** + * Reconnect client node. + * + * @param clients Clients. + * @param srv Server. + * @param disconnectedC Closure which will be run when client node disconnected. + * @throws Exception If failed. + */ + protected void reconnectClientNodes(List<Ignite> clients, Ignite srv, @Nullable Runnable disconnectedC) + throws Exception { final TestTcpDiscoverySpi srvSpi = spi(srv); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); + final CountDownLatch disconnectLatch = new CountDownLatch(clients.size()); + final CountDownLatch reconnectLatch = new CountDownLatch(clients.size()); log.info("Block reconnect."); - clientSpi.writeLatch = new CountDownLatch(1); + for (Ignite client : clients) + spi(client).writeLatch = new CountDownLatch(1); IgnitePredicate<Event> p = new IgnitePredicate<Event>() { @Override public boolean apply(Event evt) { @@ -226,9 +241,11 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra } }; - client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + for (Ignite client : clients) + client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - srvSpi.failNode(client.cluster().localNode().id(), null); + for (Ignite client : clients) + srvSpi.failNode(client.cluster().localNode().id(), null); waitReconnectEvent(disconnectLatch); @@ -237,11 +254,13 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra log.info("Allow reconnect."); - clientSpi.writeLatch.countDown(); + for (Ignite client : clients) + spi(client).writeLatch.countDown(); waitReconnectEvent(reconnectLatch); - client.events().stopLocalListen(p); + for (Ignite client : clients) + client.events().stopLocalListen(p); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 807027c..edd95e9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -40,6 +40,7 @@ import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -998,6 +999,159 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac } /** + * @throws Exception If failed. + */ + public void testReconnectClusterRestartMultinode() throws Exception { + clientMode = true; + + final int CLIENTS = 5; + + CountDownLatch disconnectLatch = new CountDownLatch(CLIENTS); + CountDownLatch reconnectLatch = new CountDownLatch(CLIENTS); + + List<IgniteCache> caches = new ArrayList<>(); + + for (int i = 0; i < CLIENTS; i++) { + Ignite client = startGrid(SRV_CNT + i); + + addListener(client, disconnectLatch, reconnectLatch); + + IgniteCache cache = client.getOrCreateCache(new CacheConfiguration<>()); + + assertNotNull(cache); + + caches.add(cache); + } + + for (int i = 0; i < SRV_CNT; i++) + stopGrid(i); + + assertTrue(disconnectLatch.await(30_000, MILLISECONDS)); + + log.info("Restart servers."); + + clientMode = false; + + startGridsMultiThreaded(0, SRV_CNT); + + assertTrue(reconnectLatch.await(30_000, MILLISECONDS)); + + for (final IgniteCache clientCache : caches) { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return clientCache.get(1); + } + }, IllegalStateException.class, null); + } + + for (int i = 0; i < SRV_CNT + CLIENTS; i++) { + Ignite ignite = grid(i); + + ClusterGroup grp = ignite.cluster().forCacheNodes(null); + + assertEquals(0, grp.nodes().size()); + + grp = ignite.cluster().forClientNodes(null); + + assertEquals(0, grp.nodes().size()); + } + } + + /** + * @throws Exception If failed. + */ + public void testReconnectMultinode() throws Exception { + grid(0).createCache(new CacheConfiguration<>()); + + clientMode = true; + + final int CLIENTS = 2; + + List<Ignite> clients = new ArrayList<>(); + + for (int i = 0; i < CLIENTS; i++) { + Ignite client = startGrid(SRV_CNT + i); + + assertNotNull(client.getOrCreateCache(new CacheConfiguration<>())); + + clients.add(client); + } + + int nodes = SRV_CNT + CLIENTS; + int srvNodes = SRV_CNT; + + for (int iter = 0; iter < 3; iter++) { + log.info("Iteration: " + iter); + + reconnectClientNodes(clients, grid(0), null); + + for (Ignite client : clients) { + IgniteCache<Object, Object> cache = client.cache(null); + + assertNotNull(cache); + + cache.put(client.name(), 1); + + assertEquals(1, cache.get(client.name())); + + ClusterGroup grp = client.cluster().forCacheNodes(null); + + assertEquals(CLIENTS + srvNodes, grp.nodes().size()); + + grp = client.cluster().forClientNodes(null); + + assertEquals(CLIENTS, grp.nodes().size()); + } + + for (int i = 0; i < nodes; i++) { + Ignite ignite = grid(i); + + ClusterGroup grp = ignite.cluster().forCacheNodes(null); + + assertEquals(CLIENTS + srvNodes, grp.nodes().size()); + + grp = ignite.cluster().forClientNodes(null); + + assertEquals(CLIENTS, grp.nodes().size()); + } + + clientMode = false; + + startGrid(nodes++); + + srvNodes++; + + clientMode = true; + + startGrid(nodes++); + } + } + + /** + * @param client Client. + * @param disconnectLatch Disconnect event latch. + * @param reconnectLatch Reconnect event latch. + */ + private void addListener(Ignite client, final CountDownLatch disconnectLatch, final CountDownLatch reconnectLatch) { + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + } + + /** * */ static class TestClass1 implements Serializable {}