Merge remote-tracking branch 'remotes/origin/master' into ignite-5075-cacheStart
# Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88b207db Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88b207db Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88b207db Branch: refs/heads/ignite-5075 Commit: 88b207dbc72a7b35b1860f849e0653c4e56a57f8 Parents: aa0142a Author: sboikov <[email protected]> Authored: Thu May 11 12:54:14 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 11 12:54:14 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheJoinNodeDiscoveryData.java | 15 ++++++++- .../processors/cache/ClusterCachesInfo.java | 34 ++++++++++++-------- .../cache/DynamicCacheChangeBatch.java | 17 ---------- .../processors/cache/GridCacheProcessor.java | 22 ++++++++++--- .../cache/distributed/CacheStartOnJoinTest.java | 10 ++++++ 5 files changed, 62 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/88b207db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java index c569818..ea24140 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java @@ -43,18 +43,31 @@ class CacheJoinNodeDiscoveryData implements Serializable { @GridToStringInclude private final IgniteUuid cacheDeploymentId; + /** */ + private final boolean startCaches; + /** * @param cacheDeploymentId Deployment ID for started caches. * @param caches Caches. * @param templates Templates. + * @param startCaches {@code True} if required to start all caches on joining node. */ CacheJoinNodeDiscoveryData( IgniteUuid cacheDeploymentId, Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches, - Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates) { + Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates, + boolean startCaches) { this.cacheDeploymentId = cacheDeploymentId; this.caches = caches; this.templates = templates; + this.startCaches = startCaches; + } + + /** + * @return {@code True} if required to start all caches on joining node. + */ + boolean startCaches() { + return startCaches; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/88b207db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 059c8ac..f74343b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -82,9 +82,6 @@ class ClusterCachesInfo { private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches; /** */ - private Map<UUID, CacheJoinNodeDiscoveryData> joiningNodesDiscoData = new HashMap<>(); - - /** */ private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs; /** @@ -104,6 +101,7 @@ class ClusterCachesInfo { } /** + * @param checkConsistency {@code True} if need check cache configurations consistency. * @throws IgniteCheckedException If failed. */ void onKernalStart(boolean checkConsistency) throws IgniteCheckedException { @@ -571,16 +569,21 @@ class ClusterCachesInfo { desc = desc0; } - if (locCfg != null || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter())) + if (locCfg != null || joinDiscoData.startCaches() || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter())) locJoinStartCaches.add(new T2<>(desc, nearCfg)); } } } else { - CacheJoinNodeDiscoveryData discoData = joiningNodesDiscoData.remove(node.id()); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + if (desc.startTopologyVersion() == null && node.id().equals(desc.receivedFrom())) + desc.startTopologyVersion(topVer); + } - if (discoData != null) - processJoiningNode(discoData, node.id(), topVer); + for (DynamicCacheDescriptor desc : registeredTemplates().values()) { + if (desc.startTopologyVersion() == null && node.id().equals(desc.receivedFrom())) + desc.startTopologyVersion(topVer); + } } } } @@ -722,12 +725,8 @@ class ClusterCachesInfo { else processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, data.joiningNodeId()); } - else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) { - CacheJoinNodeDiscoveryData old = - joiningNodesDiscoData.put(data.joiningNodeId(), (CacheJoinNodeDiscoveryData)joiningNodeData); - - assert old == null : old; - } + else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) + processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId(), null); } } @@ -753,6 +752,7 @@ class ClusterCachesInfo { /** * @param joinData Joined node discovery data. * @param nodeId Joined node ID. + * @param topVer Topology version. */ private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId, AffinityTopologyVersion topVer) { for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) { @@ -804,6 +804,14 @@ class ClusterCachesInfo { ctx.discovery().addClientNode(cfg.getName(), nodeId, cfg.getNearConfiguration() != null); } + + if (joinData.startCaches()) { + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + ctx.discovery().addClientNode(desc.cacheName(), + nodeId, + desc.cacheConfiguration().getNearConfiguration() != null); + } + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/88b207db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index e72f214..3c65326 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -42,9 +42,6 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { /** Cache updates to be executed on exchange. */ private transient ExchangeActions exchangeActions; - /** */ - private boolean startCaches; - /** * @param reqs Requests. */ @@ -77,20 +74,6 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { } /** - * @return {@code True} if required to start all caches on client node. - */ - public boolean startCaches() { - return startCaches; - } - - /** - * @param startCaches {@code True} if required to start all caches on client node. - */ - public void startCaches(boolean startCaches) { - this.startCaches = startCaches; - } - - /** * @return {@code True} if request should trigger partition exchange. */ public boolean exchangeNeeded() { http://git-wip-us.apache.org/repos/asf/ignite/blob/88b207db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index b446b32..e60f261 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -636,16 +636,28 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheSharedManager mgr : sharedCtx.managers()) mgr.start(sharedCtx); - Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches = new HashMap<>(); + if (activeOnStart && !ctx.config().isDaemon()) { + Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches = new HashMap<>(); - Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates = new HashMap<>(); + Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates = new HashMap<>(); - if (activeOnStart && !ctx.config().isDaemon()) { registerCacheFromConfig(caches, templates); + registerCacheFromPersistentStore(caches, templates); - } - cachesInfo.onStart(new CacheJoinNodeDiscoveryData(IgniteUuid.randomUuid(), caches, templates)); + CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(IgniteUuid.randomUuid(), + caches, + templates, + startAllCachesOnClientStart()); + + cachesInfo.onStart(discoData); + } + else { + cachesInfo.onStart(new CacheJoinNodeDiscoveryData(IgniteUuid.randomUuid(), + Collections.<String, CacheJoinNodeDiscoveryData.CacheInfo>emptyMap(), + Collections.<String, CacheJoinNodeDiscoveryData.CacheInfo>emptyMap(), + false)); + } if (log.isDebugEnabled()) log.debug("Started cache processor."); http://git-wip-us.apache.org/repos/asf/ignite/blob/88b207db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java index 321faf8..88df607 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java @@ -34,6 +34,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -210,6 +211,8 @@ public class CacheStartOnJoinTest extends GridCommonAbstractTest { Collection<ClusterNode> nodes = node.cluster().forCacheNodes("cache-" + c).nodes(); assertEquals(NODES, nodes.size()); + + checkCache(node, "cache-" + c); } for (int c = 0; c < 5; c++) { @@ -247,4 +250,11 @@ public class CacheStartOnJoinTest extends GridCommonAbstractTest { return ccfg; } + /** + * @param node Node. + * @param cacheName Cache name. + */ + private void checkCache(Ignite node, final String cacheName) { + assertNotNull(((IgniteKernal)node).context().cache().cache(cacheName)); + } } \ No newline at end of file
