ignite-6667 Reuse DiscoCache when possible
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/14f04c4c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/14f04c4c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/14f04c4c Branch: refs/heads/ignite-3478 Commit: 14f04c4ce80178dc55ee62b3cf09dd4ec129f3e2 Parents: 4c8bc53 Author: Aleksei Scherbakov <[email protected]> Authored: Thu Oct 26 17:13:18 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Oct 26 17:13:18 2017 +0300 ---------------------------------------------------------------------- .../internal/managers/discovery/DiscoCache.java | 45 +++-- .../discovery/DiscoveryCustomMessage.java | 13 +- .../discovery/GridDiscoveryManager.java | 190 +++++++++++-------- .../cache/CacheAffinityChangeMessage.java | 8 + .../ClientCacheChangeDiscoveryMessage.java | 9 + .../ClientCacheChangeDummyDiscoveryMessage.java | 9 + .../cache/DynamicCacheChangeBatch.java | 9 + .../binary/MetadataUpdateAcceptedMessage.java | 9 + .../binary/MetadataUpdateProposedMessage.java | 9 + .../cluster/ChangeGlobalStateFinishMessage.java | 9 + .../cluster/ChangeGlobalStateMessage.java | 11 +- .../continuous/AbstractContinuousMessage.java | 10 + .../StartRoutineAckDiscoveryMessage.java | 2 +- .../StartRoutineDiscoveryMessage.java | 2 +- .../StopRoutineAckDiscoveryMessage.java | 2 +- .../continuous/StopRoutineDiscoveryMessage.java | 2 +- .../marshaller/MappingAcceptedMessage.java | 9 + .../marshaller/MappingProposedMessage.java | 9 + .../message/SchemaAbstractDiscoveryMessage.java | 10 + .../message/SchemaProposeDiscoveryMessage.java | 3 +- .../IgniteDiscoveryCacheReuseSelfTest.java | 89 +++++++++ .../IgniteSpiDiscoverySelfTestSuite.java | 6 +- 22 files changed, 359 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index 4b57eb8..9ed70aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.managers.discovery; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -71,10 +70,10 @@ public class DiscoCache { private final Map<Integer, List<ClusterNode>> cacheGrpAffNodes; /** Node map. */ - private final Map<UUID, ClusterNode> nodeMap; + final Map<UUID, ClusterNode> nodeMap; /** Alive nodes. */ - private final Set<UUID> alives = new GridConcurrentHashSet<>(); + final Set<UUID> alives = new GridConcurrentHashSet<>(); /** */ private final IgniteProductVersion minNodeVer; @@ -95,6 +94,7 @@ public class DiscoCache { * @param cacheGrpAffNodes Affinity nodes by cache group ID. * @param nodeMap Node map. * @param alives Alive nodes. + * @param minNodeVer Minimum node version. */ DiscoCache( AffinityTopologyVersion topVer, @@ -108,7 +108,8 @@ public class DiscoCache { Map<Integer, List<ClusterNode>> allCacheNodes, Map<Integer, List<ClusterNode>> cacheGrpAffNodes, Map<UUID, ClusterNode> nodeMap, - Set<UUID> alives) { + Set<UUID> alives, + IgniteProductVersion minNodeVer) { this.topVer = topVer; this.state = state; this.loc = loc; @@ -121,19 +122,7 @@ public class DiscoCache { this.cacheGrpAffNodes = cacheGrpAffNodes; this.nodeMap = nodeMap; this.alives.addAll(alives); - - IgniteProductVersion minVer = null; - - for (int i = 0; i < allNodes.size(); i++) { - ClusterNode node = allNodes.get(i); - - if (minVer == null) - minVer = node.version(); - else if (node.version().compareTo(minVer) < 0) - minVer = node.version(); - } - - minNodeVer = minVer; + this.minNodeVer = minNodeVer; } /** @@ -326,6 +315,28 @@ public class DiscoCache { return nodes == null ? Collections.<ClusterNode>emptyList() : nodes; } + /** + * @param ver Topology version. + * @param state Not {@code null} state if need override state, otherwise current state is used. + * @return Copy of discovery cache with new version. + */ + public DiscoCache copy(AffinityTopologyVersion ver, @Nullable DiscoveryDataClusterState state) { + return new DiscoCache( + ver, + state == null ? this.state : state, + loc, + rmtNodes, + allNodes, + srvNodes, + daemonNodes, + rmtNodesWithCaches, + allCacheNodes, + cacheGrpAffNodes, + nodeMap, + alives, + minNodeVer); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DiscoCache.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java index f908b59..c708c62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.managers.discovery; import java.io.Serializable; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; @@ -89,4 +90,14 @@ public interface DiscoveryCustomMessage extends Serializable { * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes. */ public boolean isMutable(); -} \ No newline at end of file + + /** + * Creates new discovery cache if message caused topology version change. + * + * @param mgr Discovery manager. + * @param topVer New topology version. + * @param discoCache Current discovery cache. + * @return Reused discovery cache. + */ + public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 77b0622..a6737dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -71,6 +71,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessage; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; @@ -102,6 +103,7 @@ import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.plugin.segmentation.SegmentationPolicy; @@ -623,23 +625,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { updateClientNodes(node.id()); } - DiscoCache discoCache = null; - boolean locJoinEvt = type == EVT_NODE_JOINED && node.id().equals(locNode.id()); - IgniteInternalFuture<Boolean> transitionWaitFut = null; - ChangeGlobalStateFinishMessage stateFinishMsg = null; - if (locJoinEvt) { - discoCache = createDiscoCache(new AffinityTopologyVersion(topVer, minorTopVer), - ctx.state().clusterState(), - locNode, - topSnapshot); - - transitionWaitFut = ctx.state().onLocalJoin(discoCache); - } - else if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT) + if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT) stateFinishMsg = ctx.state().onNodeLeft(node); final AffinityTopologyVersion nextTopVer; @@ -658,12 +648,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { else if (customMsg instanceof ChangeGlobalStateFinishMessage) { ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg); - discoCache = createDiscoCache(topSnap.get().topVer, - ctx.state().clusterState(), - locNode, - topSnapshot); + Snapshot snapshot = topSnap.get(); + + // Topology version does not change, but need create DiscoCache with new state. + DiscoCache discoCache = snapshot.discoCache.copy(snapshot.topVer, ctx.state().clusterState()); - topSnap.set(new Snapshot(topSnap.get().topVer, discoCache)); + topSnap.set(new Snapshot(snapshot.topVer, discoCache)); incMinorTopVer = false; } @@ -704,24 +694,30 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } + DiscoCache discoCache; + // Put topology snapshot into discovery history. // There is no race possible between history maintenance and concurrent discovery // event notifications, since SPI notifies manager about all events from this listener. if (verChanged) { - if (discoCache == null) { + Snapshot snapshot = topSnap.get(); + + if (customMsg == null) { discoCache = createDiscoCache(nextTopVer, ctx.state().clusterState(), locNode, topSnapshot); } + else + discoCache = customMsg.createDiscoCache(GridDiscoveryManager.this, nextTopVer, snapshot.discoCache); discoCacheHist.put(nextTopVer, discoCache); - boolean set = updateTopologyVersionIfGreater(nextTopVer, discoCache); - - assert set || topVer == 0 : "Topology version has not been updated [this.topVer=" + - topSnap + ", topVer=" + topVer + ", node=" + node + + assert snapshot.topVer.compareTo(nextTopVer) < 0: "Topology version out of order [this.topVer=" + + topSnap + ", topVer=" + topVer + ", node=" + node + ", nextTopVer=" + nextTopVer + ", evt=" + U.gridEventName(type) + ']'; + + topSnap.set(new Snapshot(nextTopVer, discoCache)); } else // Current version. @@ -734,8 +730,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (gridStartTime == 0) gridStartTime = getSpi().getGridStartTime(); - updateTopologyVersionIfGreater(new AffinityTopologyVersion(locNode.order()), - discoCache); + topSnap.set(new Snapshot(nextTopVer, discoCache)); startLatch.countDown(); @@ -752,6 +747,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (!isLocDaemon && !ctx.clientDisconnected()) ctx.cache().context().exchange().onLocalJoin(discoEvt, discoCache); + IgniteInternalFuture<Boolean> transitionWaitFut = ctx.state().onLocalJoin(discoCache); + locJoin.onDone(new DiscoveryLocalJoinData(discoEvt, discoCache, transitionWaitFut, @@ -784,7 +781,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { topHist.clear(); topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, - createDiscoCache(AffinityTopologyVersion.ZERO, ctx.state().clusterState(), locNode, Collections.<ClusterNode>singleton(locNode)))); + createDiscoCache(AffinityTopologyVersion.ZERO, ctx.state().clusterState(), locNode, + Collections.<ClusterNode>singleton(locNode)) + )); } else if (type == EVT_CLIENT_NODE_RECONNECTED) { assert locNode.isClient() : locNode; @@ -1586,8 +1585,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } if (!locJoin.isDone()) - locJoin.onDone( - new IgniteCheckedException("Failed to wait for local node joined event (grid is stopping).")); + locJoin.onDone(new IgniteCheckedException("Failed to wait for local node joined event (grid is stopping).")); } /** {@inheritDoc} */ @@ -2219,6 +2217,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size()); ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size()); + IgniteProductVersion minVer = null; + for (ClusterNode node : topSnapshot) { if (alive(node)) alives.add(node.id()); @@ -2236,6 +2236,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } nodeMap.put(node.id(), node); + + if (minVer == null) + minVer = node.version(); + else if (node.version().compareTo(minVer) < 0) + minVer = node.version(); } assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" + @@ -2243,39 +2248,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size()); Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size()); - Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); - for (ClusterNode node : allNodes) { - assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']'; - assert !node.isDaemon(); - - for (Map.Entry<Integer, CacheGroupAffinity> e : registeredCacheGrps.entrySet()) { - CacheGroupAffinity grpAff = e.getValue(); - Integer grpId = e.getKey(); - - if (CU.affinityNode(node, grpAff.cacheFilter)) { - List<ClusterNode> nodes = cacheGrpAffNodes.get(grpId); - - if (nodes == null) - cacheGrpAffNodes.put(grpId, nodes = new ArrayList<>()); - - nodes.add(node); - } - } - - for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) { - String cacheName = entry.getKey(); - CachePredicate filter = entry.getValue(); - - if (filter.cacheNode(node)) { - if (!node.isLocal()) - rmtNodesWithCaches.add(node); - - addToMap(allCacheNodes, cacheName, node); - } - } - } + fillAffinityNodeCaches(allNodes, allCacheNodes, cacheGrpAffNodes, rmtNodesWithCaches); return new DiscoCache( topVer, @@ -2289,7 +2264,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Collections.unmodifiableMap(allCacheNodes), Collections.unmodifiableMap(cacheGrpAffNodes), Collections.unmodifiableMap(nodeMap), - alives); + alives, + minVer); } /** @@ -2311,26 +2287,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { cacheNodes.add(rich); } - /** - * Updates topology version if current version is smaller than updated. - * - * @param updated Updated topology version. - * @param discoCache Discovery cache. - * @return {@code True} if topology was updated. - */ - private boolean updateTopologyVersionIfGreater(AffinityTopologyVersion updated, DiscoCache discoCache) { - while (true) { - Snapshot cur = topSnap.get(); - - if (updated.compareTo(cur.topVer) >= 0) { - if (topSnap.compareAndSet(cur, new Snapshot(updated, discoCache))) - return true; - } - else - return false; - } - } - /** Stops local node. */ private void stopNode() { new Thread( @@ -3061,4 +3017,78 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return clientNodes.get(node.id()); } } + + /** + * Fills affinity node caches. + * + * @param allNodes All nodes. + * @param allCacheNodes All cache nodes. + * @param cacheGrpAffNodes Cache group aff nodes. + * @param rmtNodesWithCaches Rmt nodes with caches. + */ + private void fillAffinityNodeCaches(List<ClusterNode> allNodes, Map<Integer, List<ClusterNode>> allCacheNodes, + Map<Integer, List<ClusterNode>> cacheGrpAffNodes, Set<ClusterNode> rmtNodesWithCaches) { + for (ClusterNode node : allNodes) { + assert node.order() != 0 : "Invalid node order [locNode=" + localNode() + ", node=" + node + ']'; + assert !node.isDaemon(); + + for (Map.Entry<Integer, CacheGroupAffinity> e : registeredCacheGrps.entrySet()) { + CacheGroupAffinity grpAff = e.getValue(); + Integer grpId = e.getKey(); + + if (CU.affinityNode(node, grpAff.cacheFilter)) { + List<ClusterNode> nodes = cacheGrpAffNodes.get(grpId); + + if (nodes == null) + cacheGrpAffNodes.put(grpId, nodes = new ArrayList<>()); + + nodes.add(node); + } + } + + for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) { + String cacheName = entry.getKey(); + CachePredicate filter = entry.getValue(); + + if (filter.cacheNode(node)) { + if (!node.isLocal()) + rmtNodesWithCaches.add(node); + + addToMap(allCacheNodes, cacheName, node); + } + } + } + } + + /** + * Creates discovery cache after {@link DynamicCacheChangeBatch} received. + * + * @param topVer Topology version. + * @param discoCache Current disco cache. + * @return New discovery cache. + */ + public DiscoCache createDiscoCacheOnCacheChange(AffinityTopologyVersion topVer, + DiscoCache discoCache) { + List<ClusterNode> allNodes = discoCache.allNodes(); + Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size()); + Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size()); + Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + + fillAffinityNodeCaches(allNodes, allCacheNodes, cacheGrpAffNodes, rmtNodesWithCaches); + + return new DiscoCache( + topVer, + discoCache.state(), + discoCache.localNode(), + discoCache.remoteNodes(), + allNodes, + discoCache.serverNodes(), + discoCache.daemonNodes(), + U.sealList(rmtNodesWithCaches), + allCacheNodes, + cacheGrpAffNodes, + discoCache.nodeMap, + discoCache.alives, + discoCache.minimumNodeVersion()); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java index 8cff65e..fe1014c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java @@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; @@ -154,6 +156,12 @@ public class CacheAffinityChangeMessage implements DiscoveryCustomMessage { } /** {@inheritDoc} */ + @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, + AffinityTopologyVersion topVer, DiscoCache discoCache) { + return discoCache.copy(topVer, null); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheAffinityChangeMessage.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java index 3d120f7..e35d80e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java @@ -22,7 +22,10 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; @@ -170,6 +173,12 @@ public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage } /** {@inheritDoc} */ + @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, + AffinityTopologyVersion topVer, DiscoCache discoCache) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(ClientCacheChangeDiscoveryMessage.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java index 44f6002..6ed3ecc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java @@ -20,7 +20,10 @@ package org.apache.ignite.internal.processors.cache; import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -102,6 +105,12 @@ public class ClientCacheChangeDummyDiscoveryMessage implements DiscoveryCustomMe } /** {@inheritDoc} */ + @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, + AffinityTopologyVersion topVer, DiscoCache discoCache) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(ClientCacheChangeDummyDiscoveryMessage.class, this, "startCaches", (startReqs != null ? startReqs.keySet() : "")); http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index d5c820f..83459a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -19,7 +19,10 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collection; import java.util.Set; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; @@ -73,6 +76,12 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { return false; } + /** {@inheritDoc} */ + @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, + DiscoCache discoCache) { + return mgr.createDiscoCacheOnCacheChange(topVer, discoCache); + } + /** * @return Collection of change requests. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java index ef5370e..0416746 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java @@ -16,7 +16,10 @@ */ package org.apache.ignite.internal.processors.cache.binary; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -67,6 +70,12 @@ public class MetadataUpdateAcceptedMessage implements DiscoveryCustomMessage { return true; } + /** {@inheritDoc} */ + @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, + AffinityTopologyVersion topVer, DiscoCache discoCache) { + throw new UnsupportedOperationException(); + } + /** */ int acceptedVersion() { return acceptedVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java index 715e668..f9bd660 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java @@ -20,7 +20,10 @@ import java.util.UUID; import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.internal.binary.BinaryMetadata; import org.apache.ignite.internal.binary.BinaryMetadataHandler; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -130,6 +133,12 @@ public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessa return true; } + /** {@inheritDoc} */ + @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, + AffinityTopologyVersion topVer, DiscoCache discoCache) { + throw new UnsupportedOperationException(); + } + /** * @param err Error caused this update to be rejected. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java index 0771198..a1fbacf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java @@ -18,7 +18,10 @@ package org.apache.ignite.internal.processors.cluster; import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -80,6 +83,12 @@ public class ChangeGlobalStateFinishMessage implements DiscoveryCustomMessage { } /** {@inheritDoc} */ + @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, + AffinityTopologyVersion topVer, DiscoCache discoCache) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(ChangeGlobalStateFinishMessage.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java index 6579399..6a642bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java @@ -19,7 +19,10 @@ package org.apache.ignite.internal.processors.cluster; import java.util.List; import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.StoredCacheData; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -112,7 +115,13 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage { return false; } - /** + /** {@inheritDoc} */ + @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, + DiscoCache discoCache) { + return mgr.createDiscoCacheOnCacheChange(topVer, discoCache); + } + + /** * @return Node initiated state change. */ public UUID initiatorNodeId() { http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java index 01a95df..e9754d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java @@ -18,8 +18,12 @@ package org.apache.ignite.internal.processors.continuous; import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; /** * @@ -57,4 +61,10 @@ public abstract class AbstractContinuousMessage implements DiscoveryCustomMessag @Override public boolean isMutable() { return false; } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, + AffinityTopologyVersion topVer, DiscoCache discoCache) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java index 1765f2c..4063e05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java @@ -92,4 +92,4 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { @Override public String toString() { return S.toString(StartRoutineAckDiscoveryMessage.class, this, "routineId", routineId()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index 320226b..82996d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -132,4 +132,4 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { @Override public String toString() { return S.toString(StartRoutineDiscoveryMessage.class, this, "routineId", routineId()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java index e6305c7..79d8b29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java @@ -45,4 +45,4 @@ public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage { @Override public String toString() { return S.toString(StopRoutineAckDiscoveryMessage.class, this, "routineId", routineId()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java index 30d12d1..f6b18fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java @@ -45,4 +45,4 @@ public class StopRoutineDiscoveryMessage extends AbstractContinuousMessage { @Override public String toString() { return S.toString(StopRoutineDiscoveryMessage.class, this, "routineId", routineId()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java index 23c2858..7af0559 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java @@ -17,7 +17,10 @@ package org.apache.ignite.internal.processors.marshaller; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -59,6 +62,12 @@ public class MappingAcceptedMessage implements DiscoveryCustomMessage { return false; } + /** {@inheritDoc} */ + @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, + AffinityTopologyVersion topVer, DiscoCache discoCache) { + throw new UnsupportedOperationException(); + } + /** */ MarshallerMappingItem getMappingItem() { return item; http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java index 33a2168..b4e13fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java @@ -18,7 +18,10 @@ package org.apache.ignite.internal.processors.marshaller; import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -94,6 +97,12 @@ public class MappingProposedMessage implements DiscoveryCustomMessage { return true; } + /** {@inheritDoc} */ + @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, + AffinityTopologyVersion topVer, DiscoCache discoCache) { + throw new UnsupportedOperationException(); + } + /** */ MarshallerMappingItem mappingItem() { return mappingItem; http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java index 9fdc6c3..f55eae0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java @@ -17,11 +17,15 @@ package org.apache.ignite.internal.processors.query.schema.message; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; /** * Abstract discovery message for schema operations. @@ -51,6 +55,12 @@ public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomM return id; } + /** {@inheritDoc} */ + @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, + AffinityTopologyVersion topVer, DiscoCache discoCache) { + throw new UnsupportedOperationException(); + } + /** * @return Operation. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java index 5fd2606..0e1270b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java @@ -99,9 +99,8 @@ public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessag * @param err Error. */ public void onError(SchemaOperationException err) { - if (!hasError()) { + if (!hasError()) this.err = err; - } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/test/java/org/apache/ignite/spi/discovery/IgniteDiscoveryCacheReuseSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/IgniteDiscoveryCacheReuseSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/IgniteDiscoveryCacheReuseSelfTest.java new file mode 100644 index 0000000..c238a9a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/IgniteDiscoveryCacheReuseSelfTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery; + +import org.apache.ignite.Ignite; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests discovery cache reuse between topology events. + */ +public class IgniteDiscoveryCacheReuseSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + return cfg; + } + + /** + * Tests correct reuse of discovery cache. + * + * @throws Exception If failed. + */ + public void testDiscoCacheReuseOnNodeJoin() throws Exception { + startGridsMultiThreaded(2); + + assertDiscoCacheReuse(new AffinityTopologyVersion(2, 0), new AffinityTopologyVersion(2, 1)); + } + + /** + * Assert disco cache reuse. + * + * @param v1 First version. + * @param v2 Next version. + */ + private void assertDiscoCacheReuse(AffinityTopologyVersion v1, AffinityTopologyVersion v2) { + for (Ignite ignite : G.allGrids()) { + GridBoundedConcurrentLinkedHashMap<AffinityTopologyVersion, DiscoCache> discoCacheHist = + U.field(((IgniteEx) ignite).context().discovery(), "discoCacheHist"); + + DiscoCache discoCache1 = discoCacheHist.get(v1); + DiscoCache discoCache2 = discoCacheHist.get(v2); + + assertEquals(v1, discoCache1.version()); + assertEquals(v2, discoCache2.version()); + + String[] props = new String[] { + "state", "loc", "rmtNodes", "allNodes", "srvNodes", "daemonNodes", "rmtNodesWithCaches", + "allCacheNodes", "allCacheNodes", "cacheGrpAffNodes", "nodeMap", "minNodeVer" + }; + + for (String prop : props) + assertSame(U.field(discoCache1, prop), U.field(discoCache2, prop)); + + assertNotSame(U.field(discoCache1, "alives"), U.field(discoCache2, "alives")); + assertEquals(U.field(discoCache1, "alives"), U.field(discoCache2, "alives")); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/14f04c4c/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index 3335797..ff4c9c1 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -20,6 +20,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.spi.GridTcpSpiForwardingSelfTest; import org.apache.ignite.spi.discovery.AuthenticationRestartTest; +import org.apache.ignite.spi.discovery.IgniteDiscoveryCacheReuseSelfTest; import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest; import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest; import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoveryMarshallerCheckSelfTest; @@ -94,7 +95,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(TcpDiscoveryNodeAttributesUpdateOnReconnectTest.class)); suite.addTest(new TestSuite(AuthenticationRestartTest.class)); - //Client connect + // Client connect. suite.addTest(new TestSuite(IgniteClientConnectTest.class)); suite.addTest(new TestSuite(IgniteClientReconnectMassiveShutdownTest.class)); @@ -104,6 +105,9 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(TcpDiscoverySslSecuredUnsecuredTest.class)); suite.addTest(new TestSuite(TcpDiscoverySslTrustedUntrustedTest.class)); + // Disco cache reuse. + suite.addTest(new TestSuite(IgniteDiscoveryCacheReuseSelfTest.class)); + return suite; } }
