IGNITE-5830 - Introduce cache start and stop order during cluster activation
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7915fd88 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7915fd88 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7915fd88 Branch: refs/heads/master Commit: 7915fd88b1f3e399777bbc46f4e5625b68fb90c9 Parents: 85f1702 Author: Jokser <[email protected]> Authored: Wed Jul 26 12:08:03 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Jul 26 12:08:03 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/ClusterCachesInfo.java | 135 ++++++++++++------- .../processors/cache/ExchangeActions.java | 4 +- .../processors/cache/GridCacheProcessor.java | 29 ++-- 3 files changed, 106 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7915fd88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 949bc19..1a05b96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -58,6 +59,7 @@ import org.apache.ignite.plugin.CachePluginContext; import org.apache.ignite.plugin.CachePluginProvider; import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.LOCAL; @@ -97,10 +99,10 @@ class ClusterCachesInfo { private GridData gridData; /** */ - private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches; + private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = Collections.emptyList(); /** */ - private Map<String, T2<CacheConfiguration, NearCacheConfiguration>> locCfgsForActivation; + private Map<String, T2<CacheConfiguration, NearCacheConfiguration>> locCfgsForActivation = Collections.emptyMap(); /** */ private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs; @@ -111,7 +113,7 @@ class ClusterCachesInfo { /** * @param ctx Context. */ - ClusterCachesInfo(GridKernalContext ctx) { + public ClusterCachesInfo(GridKernalContext ctx) { this.ctx = ctx; log = ctx.log(getClass()); @@ -121,7 +123,7 @@ class ClusterCachesInfo { * @param joinDiscoData Information about configured caches and templates. * @throws IgniteCheckedException If configuration validation failed. */ - void onStart(CacheJoinNodeDiscoveryData joinDiscoData) throws IgniteCheckedException { + public void onStart(CacheJoinNodeDiscoveryData joinDiscoData) throws IgniteCheckedException { this.joinDiscoData = joinDiscoData; Map<String, CacheConfiguration> grpCfgs = new HashMap<>(); @@ -159,7 +161,7 @@ class ClusterCachesInfo { * @param checkConsistency {@code True} if need check cache configurations consistency. * @throws IgniteCheckedException If failed. */ - void onKernalStart(boolean checkConsistency) throws IgniteCheckedException { + public void onKernalStart(boolean checkConsistency) throws IgniteCheckedException { if (gridData != null && gridData.conflictErr != null) throw new IgniteCheckedException(gridData.conflictErr); @@ -330,7 +332,7 @@ class ClusterCachesInfo { * @param msg Message. * @param node Node sent message. */ - void onClientCacheChange(ClientCacheChangeDiscoveryMessage msg, ClusterNode node) { + public void onClientCacheChange(ClientCacheChangeDiscoveryMessage msg, ClusterNode node) { Map<Integer, Boolean> startedCaches = msg.startedCaches(); if (startedCaches != null) { @@ -359,12 +361,13 @@ class ClusterCachesInfo { } } } + /** * @param batch Cache change request. * @param topVer Topology version. * @return {@code True} if minor topology version should be increased. */ - boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) { + public boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) { DiscoveryDataClusterState state = ctx.state().clusterState(); if (state.active() && !state.transition()) { @@ -779,30 +782,28 @@ class ClusterCachesInfo { * * @return Caches to be started when this node starts. */ - List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() { + @NotNull public List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() { if (ctx.isDaemon()) return Collections.emptyList(); - assert locJoinStartCaches != null; - - List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = this.locJoinStartCaches; + List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> result = locJoinStartCaches; - this.locJoinStartCaches = null; + locJoinStartCaches = Collections.emptyList(); - return locJoinStartCaches; + return result; } /** * @param joinedNodeId Joined node ID. * @return New caches received from joined node. */ - List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) { + @NotNull public List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) { assert joinedNodeId != null; List<DynamicCacheDescriptor> started = null; if (!ctx.isDaemon()) { - for (DynamicCacheDescriptor desc : registeredCaches.values()) { + for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.DIRECT)) { if (desc.staticallyConfigured()) { assert desc.receivedFrom() != null : desc; @@ -826,7 +827,7 @@ class ClusterCachesInfo { * @param node Event node. * @param topVer Topology version. */ - void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { + public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { if (type == EVT_NODE_JOINED && !ctx.isDaemon()) { for (CacheGroupDescriptor desc : registeredCacheGrps.values()) { if (node.id().equals(desc.receivedFrom())) @@ -856,7 +857,7 @@ class ClusterCachesInfo { /** * @param dataBag Discovery data bag. */ - void collectGridNodeData(DiscoveryDataBag dataBag) { + public void collectGridNodeData(DiscoveryDataBag dataBag) { if (ctx.isDaemon()) return; @@ -931,7 +932,7 @@ class ClusterCachesInfo { /** * @param data Discovery data. */ - void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { + public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { if (ctx.isDaemon() || data.commonData() == null) return; @@ -1045,6 +1046,9 @@ class ClusterCachesInfo { } /** + * Initialize collection with caches to be start: + * {@code locJoinStartCaches} or {@code locCfgsForActivation} if cluster is inactive. + * * @param firstNode {@code True} if first node in cluster starts. */ private void initStartCachesForLocalJoin(boolean firstNode) { @@ -1062,7 +1066,7 @@ class ClusterCachesInfo { boolean active = ctx.state().clusterState().active(); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { + for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.DIRECT)) { if (firstNode && !joinDiscoData.caches().containsKey(desc.cacheName())) continue; @@ -1096,13 +1100,8 @@ class ClusterCachesInfo { if (locCfg != null || joinDiscoData.startCaches() || CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter())) { - if (active) { - // Move system and internal caches first. - if (desc.cacheType().userCache()) - locJoinStartCaches.add(new T2<>(desc, nearCfg)); - else - locJoinStartCaches.add(0, new T2<>(desc, nearCfg)); - } + if (active) + locJoinStartCaches.add(new T2<>(desc, nearCfg)); else locCfgsForActivation.put(desc.cacheName(), new T2<>(desc.cacheConfiguration(), nearCfg)); } @@ -1113,7 +1112,7 @@ class ClusterCachesInfo { /** * @param msg Message. */ - void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) { + public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) { if (joinOnTransition) { initStartCachesForLocalJoin(false); @@ -1127,17 +1126,14 @@ class ClusterCachesInfo { * @return Exchange action. * @throws IgniteCheckedException If configuration validation failed. */ - ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer) + public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer) throws IgniteCheckedException { ExchangeActions exchangeActions = new ExchangeActions(); if (msg.activate()) { - for (DynamicCacheDescriptor desc : registeredCaches.values()) { + for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.DIRECT)) { desc.startTopologyVersion(topVer); - T2<CacheConfiguration, NearCacheConfiguration> locCfg = !F.isEmpty(locCfgsForActivation) ? - locCfgsForActivation.get(desc.cacheName()) : null; - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(), desc.cacheName(), msg.initiatorNodeId()); @@ -1145,6 +1141,8 @@ class ClusterCachesInfo { req.startCacheConfiguration(desc.cacheConfiguration()); req.cacheType(desc.cacheType()); + T2<CacheConfiguration, NearCacheConfiguration> locCfg = locCfgsForActivation.get(desc.cacheName()); + if (locCfg != null) { if (locCfg.get1() != null) req.startCacheConfiguration(locCfg.get1()); @@ -1199,7 +1197,7 @@ class ClusterCachesInfo { else { locCfgsForActivation = new HashMap<>(); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { + for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.REVERSE)) { DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, desc.cacheName(), desc.sql(), @@ -1221,7 +1219,7 @@ class ClusterCachesInfo { /** * @param data Joining node data. */ - void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) { + public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) { if (data.hasJoiningNodeData()) { Serializable joiningNodeData = data.joiningNodeData(); @@ -1264,8 +1262,10 @@ class ClusterCachesInfo { } /** + * Checks cache configuration on conflict with already registered caches and cache groups. + * * @param cfg Cache configuration. - * @return {@code True} if validation passed. + * @return {@code null} if validation passed, error message in other case. */ private String checkCacheConflict(CacheConfiguration<?, ?> cfg) { int cacheId = CU.cacheId(cfg.getName()); @@ -1480,17 +1480,10 @@ class ClusterCachesInfo { } /** - * @return Registered cache groups. - */ - ConcurrentMap<Integer, CacheGroupDescriptor> registeredCacheGroups() { - return registeredCacheGrps; - } - - /** * @param ccfg Cache configuration to start. * @throws IgniteCheckedException If failed. */ - void validateStartCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException { + public void validateStartCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException { if (ccfg.getGroupName() != null) { CacheGroupDescriptor grpDesc = cacheGroupByName(ccfg.getGroupName()); @@ -1563,9 +1556,29 @@ class ClusterCachesInfo { } /** + * @return Registered cache groups. + */ + ConcurrentMap<Integer, CacheGroupDescriptor> registeredCacheGroups() { + return registeredCacheGrps; + } + + /** + * Returns registered cache descriptors ordered by {@code comparator} + * @param comparator Comparator (DIRECT, REVERSE or custom) to order cache descriptors. + * @return Ordered by comparator cache descriptors. + */ + private Collection<DynamicCacheDescriptor> orderedCaches(Comparator<DynamicCacheDescriptor> comparator) { + List<DynamicCacheDescriptor> ordered = new ArrayList<>(); + ordered.addAll(registeredCaches.values()); + + Collections.sort(ordered, comparator); + return ordered; + } + + /** * */ - void onDisconnect() { + public void onDisconnected() { cachesOnDisconnect = new CachesOnDisconnect( ctx.state().clusterState(), new HashMap<>(registeredCacheGrps), @@ -1583,7 +1596,7 @@ class ClusterCachesInfo { * @param transition {@code True} if reconnected while state transition in progress. * @return Information about stopped caches and cache groups. */ - ClusterCachesReconnectResult onReconnected(boolean active, boolean transition) { + public ClusterCachesReconnectResult onReconnected(boolean active, boolean transition) { assert disconnectedState(); Set<String> stoppedCaches = new HashSet<>(); @@ -1685,6 +1698,38 @@ class ClusterCachesInfo { } /** + * Holds direct comparator (first system caches) and reverse comparator (first user caches). + * Use DIRECT comparator for ordering cache start operations. + * Use REVERSE comparator for ordering cache stop operations. + */ + private static class CacheComparators { + /** + * DIRECT comparator for cache descriptors (first system caches). + */ + static Comparator<DynamicCacheDescriptor> DIRECT = new Comparator<DynamicCacheDescriptor>() { + @Override + public int compare(DynamicCacheDescriptor o1, DynamicCacheDescriptor o2) { + if (!o1.cacheType().userCache()) + return -1; + if (!o2.cacheType().userCache()) + return 1; + + return o1.cacheId().compareTo(o2.cacheId()); + } + }; + + /** + * REVERSE comparator for cache descriptors (first user caches). + */ + static Comparator<DynamicCacheDescriptor> REVERSE = new Comparator<DynamicCacheDescriptor>() { + @Override + public int compare(DynamicCacheDescriptor o1, DynamicCacheDescriptor o2) { + return -DIRECT.compare(o1, o2); + } + }; + } + + /** * */ private static class GridData { http://git-wip-us.apache.org/repos/asf/ignite/blob/7915fd88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java index 1cc6438..91ad003 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java @@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -224,7 +224,7 @@ public class ExchangeActions { assert desc != null; if (map == null) - map = new HashMap<>(); + map = new LinkedHashMap<>(); CacheActionData old = map.put(req.cacheName(), new CacheActionData(req, desc)); http://git-wip-us.apache.org/repos/asf/ignite/blob/7915fd88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 5b709b3..9902a92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -146,6 +146,7 @@ import org.apache.ignite.spi.IgniteNodeValidationResult; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL; @@ -1046,7 +1047,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx.onDisconnected(reconnectFut); - cachesInfo.onDisconnect(); + cachesInfo.onDisconnected(); } /** @@ -1733,7 +1734,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @return Caches to be started when this node starts. */ - public List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() { + @NotNull public List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() { return cachesInfo.cachesToStartOnLocalJoin(); } @@ -1771,22 +1772,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { throws IgniteCheckedException { List<DynamicCacheDescriptor> started = cachesInfo.cachesReceivedFromJoin(nodeId); - if (started != null) { - for (DynamicCacheDescriptor desc : started) { - IgnitePredicate<ClusterNode> filter = desc.groupDescriptor().config().getNodeFilter(); - - if (CU.affinityNode(ctx.discovery().localNode(), filter)) { - prepareCacheStart( - desc.cacheConfiguration(), - desc, - null, - exchTopVer - ); - } + for (DynamicCacheDescriptor desc : started) { + IgnitePredicate<ClusterNode> filter = desc.groupDescriptor().config().getNodeFilter(); + + if (CU.affinityNode(ctx.discovery().localNode(), filter)) { + prepareCacheStart( + desc.cacheConfiguration(), + desc, + null, + exchTopVer + ); } } - return started != null ? started : Collections.<DynamicCacheDescriptor>emptyList(); + return started; } /**
