ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/090b4402 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/090b4402 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/090b4402 Branch: refs/heads/ignite-5075 Commit: 090b440276078dda614c20b4beeff94bac950e12 Parents: d52dbb8 Author: sboikov <sboi...@gridgain.com> Authored: Thu May 4 14:45:19 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu May 4 14:45:19 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 17 +++++---- .../processors/cache/GridCacheIoManager.java | 15 +++----- .../dht/GridDhtAffinityAssignmentRequest.java | 36 ++++++++++++++++++-- .../dht/GridDhtAssignmentFetchFuture.java | 21 ++++++------ 4 files changed, 59 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/090b4402/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 45f463b..8c275e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -412,7 +412,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId()); if (clientCacheStarted) - initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign); + initAffinity(cacheDesc, cacheCtx.affinity().affinityCache(), fut, lateAffAssign); else if (!req.clientStartOnly()) { assert fut.topologyVersion().equals(cacheCtx.cacheStartTopologyVersion()); @@ -835,7 +835,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap forAllCaches(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) - initAffinity(aff, fut, false); + initAffinity(registeredCaches.get(aff.cacheId()), aff, fut, false); } }); } @@ -847,7 +847,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param fetch Force fetch flag. * @throws IgniteCheckedException If failed. */ - private void initAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut, boolean fetch) + private void initAffinity(DynamicCacheDescriptor desc, + GridAffinityAssignmentCache aff, + GridDhtPartitionsExchangeFuture fut, + boolean fetch) throws IgniteCheckedException { if (!fetch && canCalculateAffinity(aff, fut)) { List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); @@ -856,7 +859,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } else { GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, - aff.cacheName(), + desc, fut.topologyVersion(), fut.discoCache()); @@ -985,7 +988,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } else { GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, - cacheCtx.name(), + cacheDesc, topVer, fut.discoCache()); @@ -1094,7 +1097,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cacheCtx.isLocal()) continue; - initAffinity(cacheCtx.affinity().affinityCache(), fut, false); + initAffinity(registeredCaches.get(cacheCtx.cacheId()), cacheCtx.affinity().affinityCache(), fut, false); } } @@ -1152,7 +1155,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev; GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, - aff.cacheName(), + desc, prev.topologyVersion(), prev.discoCache()); http://git-wip-us.apache.org/repos/asf/ignite/blob/090b4402/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index a80213d..5e7e401 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -146,23 +146,18 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) { assert cacheMsg.topologyVersion() != null : cacheMsg; - AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(cctx.localNode().order()); - - DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId()); - - // TODO: should be specified in request since cache desc can be removed, - if (cacheDesc != null) - startTopVer = cacheDesc.startTopologyVersion(); + AffinityTopologyVersion waitVer = + ((GridDhtAffinityAssignmentRequest)cacheMsg).waitTopologyVersion(); // Need to wait for exchange to avoid race between cache start and affinity request. - fut = cctx.exchange().affinityReadyFuture(startTopVer); + fut = cctx.exchange().affinityReadyFuture(waitVer); if (fut != null && !fut.isDone()) { if (log.isDebugEnabled()) { log.debug("Wait for exchange before processing message [msg=" + msg + ", node=" + nodeId + - ", waitVer=" + startTopVer + - ", cacheDesc=" + cacheDesc + ']'); + ", waitVer=" + waitVer + + ", cacheDesc=" + cctx.cache().cacheDescriptor(cacheMsg.cacheId()) + ']'); } fut.listen(new CI1<IgniteInternalFuture<?>>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/090b4402/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java index 94f11ed..0b3080e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java @@ -23,7 +23,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.jetbrains.annotations.NotNull; /** * Affinity assignment request. @@ -35,6 +34,9 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { /** Topology version being queried. */ private AffinityTopologyVersion topVer; + /** */ + private AffinityTopologyVersion waitTopVer; + /** * Empty constructor. */ @@ -45,10 +47,24 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { /** * @param cacheId Cache ID. * @param topVer Topology version. + * @param waitTopVer Topology version to wait for before message processing. */ - public GridDhtAffinityAssignmentRequest(int cacheId, @NotNull AffinityTopologyVersion topVer) { + public GridDhtAffinityAssignmentRequest(int cacheId, + AffinityTopologyVersion topVer, + AffinityTopologyVersion waitTopVer) { + assert topVer != null; + assert waitTopVer != null; + this.cacheId = cacheId; this.topVer = topVer; + this.waitTopVer = waitTopVer; + } + + /** + * @return Topology version to wait for before message processing. + */ + public AffinityTopologyVersion waitTopologyVersion() { + return waitTopVer; } /** {@inheritDoc} */ @@ -75,7 +91,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 4; + return 5; } /** {@inheritDoc} */ @@ -99,6 +115,12 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { writer.incrementState(); + case 4: + if (!writer.writeMessage("waitTopVer", waitTopVer)) + return false; + + writer.incrementState(); + } return true; @@ -123,6 +145,14 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { reader.incrementState(); + case 4: + waitTopVer = reader.readMessage("waitTopVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridDhtAffinityAssignmentRequest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/090b4402/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index 4f94ae2..1d6563e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -32,12 +32,12 @@ import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -48,9 +48,6 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFF * Future that fetches affinity assignment from remote cache nodes. */ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffinityAssignmentResponse> { - /** */ - private static final long serialVersionUID = 0L; - /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); @@ -71,23 +68,26 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin @GridToStringInclude private final T2<Integer, AffinityTopologyVersion> key; + /** */ + private final DynamicCacheDescriptor cacheDesc; + /** * @param ctx Context. - * @param cacheName Cache name. + * @param cacheDesc Cache descriptor. * @param topVer Topology version. * @param discoCache Discovery cache. */ public GridDhtAssignmentFetchFuture( GridCacheSharedContext ctx, - String cacheName, + DynamicCacheDescriptor cacheDesc, AffinityTopologyVersion topVer, DiscoCache discoCache ) { this.ctx = ctx; - int cacheId = CU.cacheId(cacheName); - this.key = new T2<>(cacheId, topVer); + this.cacheDesc = cacheDesc; + this.key = new T2<>(cacheDesc.cacheId(), topVer); - Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheId); + Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId()); LinkedList<ClusterNode> tmp = new LinkedList<>(); @@ -188,7 +188,8 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin log0.debug("Sending affinity fetch request to remote node [locNodeId=" + ctx.localNodeId() + ", node=" + node + ']'); - ctx.io().send(node, new GridDhtAffinityAssignmentRequest(key.get1(), key.get2()), + ctx.io().send(node, + new GridDhtAffinityAssignmentRequest(key.get1(), key.get2(), cacheDesc.startTopologyVersion()), AFFINITY_POOL); // Close window for listener notification.