ignite-5075

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/090b4402
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/090b4402
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/090b4402

Branch: refs/heads/ignite-5075
Commit: 090b440276078dda614c20b4beeff94bac950e12
Parents: d52dbb8
Author: sboikov <sboi...@gridgain.com>
Authored: Thu May 4 14:45:19 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu May 4 14:45:19 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 17 +++++----
 .../processors/cache/GridCacheIoManager.java    | 15 +++-----
 .../dht/GridDhtAffinityAssignmentRequest.java   | 36 ++++++++++++++++++--
 .../dht/GridDhtAssignmentFetchFuture.java       | 21 ++++++------
 4 files changed, 59 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/090b4402/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 45f463b..8c275e0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -412,7 +412,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         req.clientStartOnly() && 
req.initiatingNodeId().equals(cctx.localNodeId());
 
                     if (clientCacheStarted)
-                        initAffinity(cacheCtx.affinity().affinityCache(), fut, 
lateAffAssign);
+                        initAffinity(cacheDesc, 
cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
                     else if (!req.clientStartOnly()) {
                         assert 
fut.topologyVersion().equals(cacheCtx.cacheStartTopologyVersion());
 
@@ -835,7 +835,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             forAllCaches(false, new 
IgniteInClosureX<GridAffinityAssignmentCache>() {
                 @Override public void applyx(GridAffinityAssignmentCache aff) 
throws IgniteCheckedException {
                     if (aff.lastVersion().equals(AffinityTopologyVersion.NONE))
-                        initAffinity(aff, fut, false);
+                        initAffinity(registeredCaches.get(aff.cacheId()), aff, 
fut, false);
                 }
             });
         }
@@ -847,7 +847,10 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @param fetch Force fetch flag.
      * @throws IgniteCheckedException If failed.
      */
-    private void initAffinity(GridAffinityAssignmentCache aff, 
GridDhtPartitionsExchangeFuture fut, boolean fetch)
+    private void initAffinity(DynamicCacheDescriptor desc,
+        GridAffinityAssignmentCache aff,
+        GridDhtPartitionsExchangeFuture fut,
+        boolean fetch)
         throws IgniteCheckedException {
         if (!fetch && canCalculateAffinity(aff, fut)) {
             List<List<ClusterNode>> assignment = 
aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
@@ -856,7 +859,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         }
         else {
             GridDhtAssignmentFetchFuture fetchFut = new 
GridDhtAssignmentFetchFuture(cctx,
-                aff.cacheName(),
+                desc,
                 fut.topologyVersion(),
                 fut.discoCache());
 
@@ -985,7 +988,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             }
             else {
                 GridDhtAssignmentFetchFuture fetchFut = new 
GridDhtAssignmentFetchFuture(cctx,
-                    cacheCtx.name(),
+                    cacheDesc,
                     topVer,
                     fut.discoCache());
 
@@ -1094,7 +1097,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             if (cacheCtx.isLocal())
                 continue;
 
-            initAffinity(cacheCtx.affinity().affinityCache(), fut, false);
+            initAffinity(registeredCaches.get(cacheCtx.cacheId()), 
cacheCtx.affinity().affinityCache(), fut, false);
         }
     }
 
@@ -1152,7 +1155,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     assert 
prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev;
 
                     GridDhtAssignmentFetchFuture fetchFut = new 
GridDhtAssignmentFetchFuture(cctx,
-                        aff.cacheName(),
+                        desc,
                         prev.topologyVersion(),
                         prev.discoCache());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/090b4402/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index a80213d..5e7e401 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -146,23 +146,18 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
                 if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
                     assert cacheMsg.topologyVersion() != null : cacheMsg;
 
-                    AffinityTopologyVersion startTopVer = new 
AffinityTopologyVersion(cctx.localNode().order());
-
-                    DynamicCacheDescriptor cacheDesc = 
cctx.cache().cacheDescriptor(cacheMsg.cacheId());
-
-                    // TODO: should be specified in request since cache desc 
can be removed,
-                    if (cacheDesc != null)
-                        startTopVer = cacheDesc.startTopologyVersion();
+                    AffinityTopologyVersion waitVer =
+                        
((GridDhtAffinityAssignmentRequest)cacheMsg).waitTopologyVersion();
 
                     // Need to wait for exchange to avoid race between cache 
start and affinity request.
-                    fut = cctx.exchange().affinityReadyFuture(startTopVer);
+                    fut = cctx.exchange().affinityReadyFuture(waitVer);
 
                     if (fut != null && !fut.isDone()) {
                         if (log.isDebugEnabled()) {
                             log.debug("Wait for exchange before processing 
message [msg=" + msg +
                                 ", node=" + nodeId +
-                                ", waitVer=" + startTopVer +
-                                ", cacheDesc=" + cacheDesc + ']');
+                                ", waitVer=" + waitVer +
+                                ", cacheDesc=" + 
cctx.cache().cacheDescriptor(cacheMsg.cacheId()) + ']');
                         }
 
                         fut.listen(new CI1<IgniteInternalFuture<?>>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/090b4402/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index 94f11ed..0b3080e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -23,7 +23,6 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * Affinity assignment request.
@@ -35,6 +34,9 @@ public class GridDhtAffinityAssignmentRequest extends 
GridCacheMessage {
     /** Topology version being queried. */
     private AffinityTopologyVersion topVer;
 
+    /** */
+    private AffinityTopologyVersion waitTopVer;
+
     /**
      * Empty constructor.
      */
@@ -45,10 +47,24 @@ public class GridDhtAffinityAssignmentRequest extends 
GridCacheMessage {
     /**
      * @param cacheId Cache ID.
      * @param topVer Topology version.
+     * @param waitTopVer Topology version to wait for before message 
processing.
      */
-    public GridDhtAffinityAssignmentRequest(int cacheId, @NotNull 
AffinityTopologyVersion topVer) {
+    public GridDhtAffinityAssignmentRequest(int cacheId,
+        AffinityTopologyVersion topVer,
+        AffinityTopologyVersion waitTopVer) {
+        assert topVer != null;
+        assert waitTopVer != null;
+
         this.cacheId = cacheId;
         this.topVer = topVer;
+        this.waitTopVer = waitTopVer;
+    }
+
+    /**
+     * @return Topology version to wait for before message processing.
+     */
+    public AffinityTopologyVersion waitTopologyVersion() {
+        return waitTopVer;
     }
 
     /** {@inheritDoc} */
@@ -75,7 +91,7 @@ public class GridDhtAffinityAssignmentRequest extends 
GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 4;
+        return 5;
     }
 
     /** {@inheritDoc} */
@@ -99,6 +115,12 @@ public class GridDhtAffinityAssignmentRequest extends 
GridCacheMessage {
 
                 writer.incrementState();
 
+            case 4:
+                if (!writer.writeMessage("waitTopVer", waitTopVer))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -123,6 +145,14 @@ public class GridDhtAffinityAssignmentRequest extends 
GridCacheMessage {
 
                 reader.incrementState();
 
+            case 4:
+                waitTopVer = reader.readMessage("waitTopVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridDhtAffinityAssignmentRequest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/090b4402/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 4f94ae2..1d6563e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -32,12 +32,12 @@ import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
@@ -48,9 +48,6 @@ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.AFF
  * Future that fetches affinity assignment from remote cache nodes.
  */
 public class GridDhtAssignmentFetchFuture extends 
GridFutureAdapter<GridDhtAffinityAssignmentResponse> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
 
@@ -71,23 +68,26 @@ public class GridDhtAssignmentFetchFuture extends 
GridFutureAdapter<GridDhtAffin
     @GridToStringInclude
     private final T2<Integer, AffinityTopologyVersion> key;
 
+    /** */
+    private final DynamicCacheDescriptor cacheDesc;
+
     /**
      * @param ctx Context.
-     * @param cacheName Cache name.
+     * @param cacheDesc Cache descriptor.
      * @param topVer Topology version.
      * @param discoCache Discovery cache.
      */
     public GridDhtAssignmentFetchFuture(
         GridCacheSharedContext ctx,
-        String cacheName,
+        DynamicCacheDescriptor cacheDesc,
         AffinityTopologyVersion topVer,
         DiscoCache discoCache
     ) {
         this.ctx = ctx;
-        int cacheId = CU.cacheId(cacheName);
-        this.key = new T2<>(cacheId, topVer);
+        this.cacheDesc = cacheDesc;
+        this.key = new T2<>(cacheDesc.cacheId(), topVer);
 
-        Collection<ClusterNode> availableNodes = 
discoCache.cacheAffinityNodes(cacheId);
+        Collection<ClusterNode> availableNodes = 
discoCache.cacheAffinityNodes(cacheDesc.cacheId());
 
         LinkedList<ClusterNode> tmp = new LinkedList<>();
 
@@ -188,7 +188,8 @@ public class GridDhtAssignmentFetchFuture extends 
GridFutureAdapter<GridDhtAffin
                         log0.debug("Sending affinity fetch request to remote 
node [locNodeId=" + ctx.localNodeId() +
                             ", node=" + node + ']');
 
-                    ctx.io().send(node, new 
GridDhtAffinityAssignmentRequest(key.get1(), key.get2()),
+                    ctx.io().send(node,
+                        new GridDhtAffinityAssignmentRequest(key.get1(), 
key.get2(), cacheDesc.startTopologyVersion()),
                         AFFINITY_POOL);
 
                     // Close window for listener notification.

Reply via email to