Repository: ignite Updated Branches: refs/heads/ignite-5075-cacheStart a9317a4c2 -> 5a024a5f1
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a024a5f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a024a5f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a024a5f Branch: refs/heads/ignite-5075-cacheStart Commit: 5a024a5f113d1a3edb427e67fbe0e68a9a5c3a1f Parents: a9317a4 Author: sboikov <[email protected]> Authored: Fri May 12 15:50:49 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri May 12 15:50:49 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/ClusterCachesInfo.java | 22 ++++----- .../cache/DynamicCacheDescriptor.java | 47 +++++++++++++++++--- .../processors/cache/GridCacheIoManager.java | 18 +++++--- 3 files changed, 64 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5a024a5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index efcf6a8..da36470 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -337,15 +337,17 @@ class ClusterCachesInfo { if (needExchange) { req.clientStartOnly(true); - desc.localStartVersion(topVer.nextMinorVersion()); + desc.clientCacheStartVersion(topVer.nextMinorVersion()); exchangeActions.addClientCacheToStart(req, desc); } } - if (!needExchange) { - if (desc != null) - waitTopVer = desc.localStartVersion(); + if (!needExchange && desc != null) { + if (desc.clientCacheStartVersion() != null) + waitTopVer = desc.clientCacheStartVersion(); + else + waitTopVer = desc.startTopologyVersion(); } } else if (req.globalStateChange()) @@ -397,7 +399,7 @@ class ClusterCachesInfo { for (DynamicCacheDescriptor desc : addedDescs) { assert desc.template() || incMinorTopVer; - desc.localStartVersion(startTopVer); + desc.startTopologyVersion(startTopVer); } } @@ -541,8 +543,6 @@ class ClusterCachesInfo { processJoiningNode(joinDiscoData, node.id()); for (DynamicCacheDescriptor desc : registeredCaches.values()) { - desc.localStartVersion(topVer); - CacheConfiguration cfg = desc.cacheConfiguration(); CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName()); @@ -559,7 +559,9 @@ class ClusterCachesInfo { desc.deploymentId(), desc.schema()); - desc0.localStartVersion(desc.localStartVersion()); + desc0.startTopologyVersion(desc.startTopologyVersion()); + desc0.receivedFromStartVersion(desc.receivedFromStartVersion()); + desc0.clientCacheStartVersion(desc.clientCacheStartVersion()); desc0.receivedFrom(desc.receivedFrom()); desc0.staticallyConfigured(desc.staticallyConfigured()); @@ -574,12 +576,12 @@ class ClusterCachesInfo { for (DynamicCacheDescriptor desc : registeredCaches.values()) { if (node.id().equals(desc.receivedFrom())) - desc.localStartVersion(topVer); + desc.receivedFromStartVersion(topVer); } for (DynamicCacheDescriptor desc : registeredTemplates.values()) { if (node.id().equals(desc.receivedFrom())) - desc.localStartVersion(topVer); + desc.receivedFromStartVersion(topVer); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5a024a5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index 5c7060c..cec1828 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -73,7 +73,13 @@ public class DynamicCacheDescriptor { private volatile CacheObjectContext objCtx; /** */ - private volatile transient AffinityTopologyVersion locStartVer; + private AffinityTopologyVersion startTopVer; + + /** */ + private AffinityTopologyVersion rcvdFromVer; + + /** */ + private AffinityTopologyVersion clientCacheStartVer; /** Mutex to control schema. */ private final Object schemaMux = new Object(); @@ -230,17 +236,46 @@ public class DynamicCacheDescriptor { } /** + * @return Topology version when node provided cache configuration was started. + */ + @Nullable public AffinityTopologyVersion receivedFromStartVersion() { + return rcvdFromVer; + } + + /** + * @param rcvdFromVer Topology version when node provided cache configuration was started. + */ + public void receivedFromStartVersion(AffinityTopologyVersion rcvdFromVer) { + this.rcvdFromVer = rcvdFromVer; + } + + + /** + * @return Start topology version. + */ + @Nullable public AffinityTopologyVersion startTopologyVersion() { + return startTopVer; + } + + /** + * @param startTopVer Start topology version. + */ + public void startTopologyVersion(AffinityTopologyVersion startTopVer) { + this.startTopVer = startTopVer; + } + + /** * @return Version when client cache on local node was started. */ - @Nullable AffinityTopologyVersion localStartVersion() { - return locStartVer; + @Nullable AffinityTopologyVersion clientCacheStartVersion() { + return clientCacheStartVer; } /** - * @param locStartVer Version when cache on local node was started. + * @param clientCacheStartVer Version when client cache on local node was started. */ - public void localStartVersion(AffinityTopologyVersion locStartVer) { - this.locStartVer = locStartVer; + public void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) { + this.clientCacheStartVer = clientCacheStartVer; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5a024a5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index a8a4dcd..fdd29e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -146,22 +146,26 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) { assert cacheMsg.topologyVersion() != null : cacheMsg; - DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId()); + AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(cctx.localNode().order()); - AffinityTopologyVersion waitVer = cacheDesc != null ? cacheDesc.localStartVersion() : null; + DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId()); - if (waitVer == null) - waitVer = new AffinityTopologyVersion(cctx.localNode().order()); + if (cacheDesc != null) { + if (cacheDesc.startTopologyVersion() != null) + startTopVer = cacheDesc.startTopologyVersion(); + else if (cacheDesc.receivedFromStartVersion() != null) + startTopVer = cacheDesc.receivedFromStartVersion(); + } // Need to wait for exchange to avoid race between cache start and affinity request. - fut = cctx.exchange().affinityReadyFuture(waitVer); + fut = cctx.exchange().affinityReadyFuture(startTopVer); if (fut != null && !fut.isDone()) { if (log.isDebugEnabled()) { log.debug("Wait for exchange before processing message [msg=" + msg + ", node=" + nodeId + - ", waitVer=" + waitVer + - ", cacheDesc=" + cctx.cache().cacheDescriptor(cacheMsg.cacheId()) + ']'); + ", waitVer=" + startTopVer + + ", cacheDesc=" + cacheDesc + ']'); } fut.listen(new CI1<IgniteInternalFuture<?>>() {
