ignite-5578 Affinity for local join

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a46272c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a46272c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a46272c

Branch: refs/heads/ignite-5578
Commit: 4a46272c61821e90e48c1e843f5dd1eda0320a09
Parents: 0e7064d
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Jul 12 18:25:39 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Jul 12 18:37:55 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   4 +-
 .../cache/CacheAffinitySharedManager.java       |   1 +
 .../dht/preloader/CacheGroupAffinity.java       | 159 -------------
 .../preloader/CacheGroupAffinityMessage.java    | 229 +++++++++++++++++++
 .../GridDhtPartitionsExchangeFuture.java        |  91 ++------
 .../preloader/GridDhtPartitionsFullMessage.java |  11 +-
 .../CacheLateAffinityAssignmentTest.java        |   2 +-
 7 files changed, 262 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 261a619..003c2f0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -81,7 +81,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearCacheUpdates;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinity;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -871,7 +871,7 @@ public class GridIoMessageFactory implements MessageFactory 
{
                 break;
 
             case 128:
-                msg = new CacheGroupAffinity();
+                msg = new CacheGroupAffinityMessage();
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 3f24547..879e6a9a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1460,6 +1460,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
     /**
      * @param fut Exchange future.
+     * @param newAff {@code True} if there are no older nodes with affinity 
info available.
      * @throws IgniteCheckedException If failed.
      * @return Future completed when caches initialization is done.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java
deleted file mode 100644
index 1e1509a..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CacheGroupAffinity implements Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private int grpId;
-
-    /** */
-    @GridDirectCollection(GridLongList.class)
-    private List<GridLongList> assign;
-
-    /**
-     *
-     */
-    public CacheGroupAffinity() {
-        // No-op.
-    }
-
-    /**
-     * @param grpId Group ID.
-     * @param assign0 Assignment.
-     */
-    CacheGroupAffinity(int grpId, List<List<ClusterNode>> assign0) {
-        this.grpId = grpId;
-
-        assign = new ArrayList<>(assign0.size());
-
-        for (int i = 0; i < assign0.size(); i++) {
-            List<ClusterNode> nodes = assign0.get(i);
-
-            GridLongList l = new GridLongList(nodes.size());
-
-            for (int n = 0; n < nodes.size(); n++)
-                l.add(nodes.get(n).order());
-
-            assign.add(l);
-        }
-    }
-
-    /**
-     * @return Cache group ID.
-     */
-    int groupId() {
-        return grpId;
-    }
-
-    /**
-     * @return Assignments.
-     */
-    List<GridLongList> assignments() {
-        return assign;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeCollection("assign", assign, 
MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeInt("grpId", grpId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                assign = reader.readCollection("assign", 
MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                grpId = reader.readInt("grpId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(CacheGroupAffinity.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 128;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 2;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
new file mode 100644
index 0000000..5cd5d26
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class CacheGroupAffinityMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private int grpId;
+
+    /** */
+    @GridDirectCollection(GridLongList.class)
+    private List<GridLongList> assigns;
+
+    /**
+     *
+     */
+    public CacheGroupAffinityMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param grpId Group ID.
+     * @param assign0 Assignment.
+     */
+    private CacheGroupAffinityMessage(int grpId, List<List<ClusterNode>> 
assign0) {
+        this.grpId = grpId;
+
+        assigns = new ArrayList<>(assign0.size());
+
+        for (int i = 0; i < assign0.size(); i++) {
+            List<ClusterNode> nodes = assign0.get(i);
+
+            GridLongList l = new GridLongList(nodes.size());
+
+            for (int n = 0; n < nodes.size(); n++)
+                l.add(nodes.get(n).order());
+
+            assigns.add(l);
+        }
+    }
+
+    /**
+     * @return Cache group ID.
+     */
+    int groupId() {
+        return grpId;
+    }
+
+    /**
+     * @param cctx Context.
+     * @param topVer Topology version.
+     * @param affReq Cache group IDs.
+     * @param cachesAff Optional already prepared affinity.
+     * @return Affinity.
+     */
+    static Map<Integer, CacheGroupAffinityMessage> createAffinityMessages(
+        GridCacheSharedContext cctx,
+        AffinityTopologyVersion topVer,
+        Collection<Integer> affReq,
+        @Nullable Map<Integer, CacheGroupAffinityMessage> cachesAff) {
+        assert !F.isEmpty(affReq);
+
+        if (cachesAff == null)
+            cachesAff = U.newHashMap(affReq.size());
+
+        for (Integer grpId : affReq) {
+            if (!cachesAff.containsKey(grpId)) {
+                List<List<ClusterNode>> assign = 
cctx.affinity().affinity(grpId).assignments(topVer);
+
+                cachesAff.put(grpId, new CacheGroupAffinityMessage(grpId, 
assign));
+            }
+        }
+
+        return cachesAff;
+    }
+
+    /**
+     * @param nodesByOrder Nodes by order cache.
+     * @param discoCache Discovery data cache.
+     * @return Assignments.
+     */
+    List<List<ClusterNode>> createAssignments(Map<Long, ClusterNode> 
nodesByOrder, DiscoCache discoCache) {
+        List<List<ClusterNode>> assignments0 = new ArrayList<>(assigns.size());
+
+        for (int p = 0; p < assigns.size(); p++) {
+            GridLongList assign = assigns.get(p);
+            List<ClusterNode> assign0 = new ArrayList<>(assign.size());
+
+            for (int n = 0; n < assign.size(); n++) {
+                long order = assign.get(n);
+
+                ClusterNode affNode = nodesByOrder.get(order);
+
+                if (affNode == null) {
+                    affNode = discoCache.serverNodeByOrder(order);
+
+                    assert affNode != null : order;
+
+                    nodesByOrder.put(order, affNode);
+                }
+
+                assign0.add(affNode);
+            }
+
+            assignments0.add(assign0);
+        }
+
+        return assignments0;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeCollection("assigns", assigns, 
MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeInt("grpId", grpId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                assigns = reader.readCollection("assigns", 
MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                grpId = reader.readInt("grpId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CacheGroupAffinityMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 128;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheGroupAffinityMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4a39bae..ab66df3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -77,7 +77,6 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -1183,7 +1182,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @param cachesAff Affinity if was requested by some nodes.
      * @throws IgniteCheckedException If failed.
      */
-    private void sendAllPartitions(Collection<ClusterNode> nodes, 
Collection<CacheGroupAffinity> cachesAff)
+    private void sendAllPartitions(Collection<ClusterNode> nodes, 
Collection<CacheGroupAffinityMessage> cachesAff)
         throws IgniteCheckedException {
         boolean singleNode = nodes.size() == 1;
 
@@ -1443,7 +1442,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         return;
                     }
 
-                    processMessage(node.id(), msg);
+                    processSingleMessage(node.id(), msg);
                 }
             });
         }
@@ -1453,7 +1452,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @param nodeId Sender node.
      * @param msg Message.
      */
-    private void processMessage(UUID nodeId, GridDhtPartitionsSingleMessage 
msg) {
+    private void processSingleMessage(UUID nodeId, 
GridDhtPartitionsSingleMessage msg) {
         boolean allReceived = false;
         boolean updateSingleMap = false;
 
@@ -1723,29 +1722,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @param affReq Cache group IDs.
-     * @param cachesAff Optional already prepared affinity.
-     * @return Affinity.
-     */
-    private Map<Integer, CacheGroupAffinity> 
initCachesAffinity(Collection<Integer> affReq,
-       @Nullable Map<Integer, CacheGroupAffinity> cachesAff) {
-       assert !F.isEmpty(affReq);
-
-        if (cachesAff == null)
-            cachesAff = U.newHashMap(affReq.size());
-
-        for (Integer grpId : affReq) {
-            if (!cachesAff.containsKey(grpId)) {
-                List<List<ClusterNode>> assign = 
cctx.affinity().affinity(grpId).assignments(topologyVersion());
-
-                cachesAff.put(grpId, new CacheGroupAffinity(grpId, assign));
-            }
-        }
-
-        return cachesAff;
-    }
-
-    /**
      *
      */
     private void onAllReceived() {
@@ -1761,7 +1737,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 }
             }
 
-            Map<Integer, CacheGroupAffinity> cachesAff = null;
+            Map<Integer, CacheGroupAffinityMessage> cachesAff = null;
 
             for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
msgs.entrySet()) {
                 GridDhtPartitionsSingleMessage msg = e.getValue();
@@ -1784,7 +1760,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
 
                 if (affReq != null) {
-                    cachesAff = initCachesAffinity(affReq, cachesAff);
+                    cachesAff = 
CacheGroupAffinityMessage.createAffinityMessages(cctx,
+                        topologyVersion(),
+                        affReq,
+                        cachesAff);
 
                     UUID nodeId = e.getKey();
 
@@ -1930,10 +1909,14 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             if (n != null) {
                 Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
 
-                Collection<CacheGroupAffinity> cachesAff = null;
+                Collection<CacheGroupAffinityMessage> cachesAff = null;
 
                 if (affReq != null) {
-                    Map<Integer, CacheGroupAffinity> affMap = 
initCachesAffinity(affReq, null);
+                    Map<Integer, CacheGroupAffinityMessage> affMap = 
CacheGroupAffinityMessage.createAffinityMessages(
+                        cctx,
+                        msg.exchangeId().topologyVersion(),
+                        affReq,
+                        null);
 
                     cachesAff = affMap.values();
                 }
@@ -2004,7 +1987,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     return;
                 }
 
-                processMessage(node, msg);
+                processFullMessage(node, msg);
             }
         });
     }
@@ -2038,7 +2021,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @param node Sender node.
      * @param msg Message.
      */
-    private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage 
msg) {
+    private void processFullMessage(ClusterNode node, 
GridDhtPartitionsFullMessage msg) {
         assert exchId.equals(msg.exchangeId()) : msg;
         assert msg.lastVersion() != null : msg;
 
@@ -2065,50 +2048,27 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         if (localJoinExchange() && affReq != null) {
             Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
 
-            Collection<CacheGroupAffinity> cachesAff = msg.cachesAffinity();
+            Collection<CacheGroupAffinityMessage> cachesAff = 
msg.cachesAffinity();
 
             assert !F.isEmpty(cachesAff) : msg;
             assert cachesAff.size() >= affReq.size();
 
             int cnt = 0;
 
-            for (CacheGroupAffinity aff : cachesAff) {
+            for (CacheGroupAffinityMessage aff : cachesAff) {
                 if (affReq.contains(aff.groupId())) {
                     CacheGroupContext grp = 
cctx.cache().cacheGroup(aff.groupId());
 
                     assert grp != null : aff.groupId();
                     assert 
AffinityTopologyVersion.NONE.equals(grp.affinity().lastVersion());
 
-                    List<GridLongList> assignments = aff.assignments();
-                    List<List<ClusterNode>> assignments0 = new 
ArrayList<>(assignments.size());
-
-                    for (int p = 0; p < assignments.size(); p++) {
-                        GridLongList assign = assignments.get(p);
-                        List<ClusterNode> assign0 = new 
ArrayList<>(assign.size());
-
-                        for (int n = 0; n < assign.size(); n++) {
-                            long order = assign.get(n);
-
-                            ClusterNode affNode = nodesByOrder.get(order);
-
-                            if (affNode == null) {
-                                affNode = discoCache.serverNodeByOrder(order);
-
-                                assert affNode != null : order;
-
-                                nodesByOrder.put(order, affNode);
-                            }
-
-                            assign0.add(affNode);
-                        }
-
-                        assignments0.add(assign0);
-                    }
+                    List<List<ClusterNode>> assignments = 
aff.createAssignments(nodesByOrder, discoCache);
 
+                    // Calculate ideal assignments.
                     if (!grp.affinity().centralizedAffinityFunction())
                         grp.affinity().calculate(topologyVersion(), discoEvt, 
discoCache);
 
-                    grp.affinity().initialize(topologyVersion(), assignments0);
+                    grp.affinity().initialize(topologyVersion(), assignments);
 
                     try {
                         grp.topology().initPartitions(this);
@@ -2309,7 +2269,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     try {
                         boolean crdChanged = false;
                         boolean allReceived = false;
-                        Set<UUID> remaining0 = null;
 
                         ClusterNode crd0;
 
@@ -2328,9 +2287,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                             }
 
                             if (crd != null && crd.isLocal()) {
-                                if (crdChanged)
-                                    remaining0 = new HashSet<>(remaining);
-                                else if (crdReady && rmvd)
+                                if (!crdChanged && crdReady && rmvd)
                                     allReceived = remaining.isEmpty();
                             }
 
@@ -2390,7 +2347,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                                 sendPartitions(crd0);
 
                                 for (Map.Entry<ClusterNode, 
GridDhtPartitionsFullMessage> m : fullMsgs.entrySet())
-                                    processMessage(m.getKey(), m.getValue());
+                                    processFullMessage(m.getKey(), 
m.getValue());
                             }
                         }
                     }
@@ -2447,7 +2404,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             }
 
             for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : 
singleMsgs.entrySet())
-                processMessage(m.getKey(), m.getValue());
+                processSingleMessage(m.getKey(), m.getValue());
         }
         else {
             awaitSingleMapUpdates();

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 1ef383a..edc9c9e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -33,7 +33,6 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -104,8 +103,8 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     private transient boolean compress;
 
     /** */
-    @GridDirectCollection(CacheGroupAffinity.class)
-    private Collection<CacheGroupAffinity> cachesAff;
+    @GridDirectCollection(CacheGroupAffinityMessage.class)
+    private Collection<CacheGroupAffinityMessage> cachesAff;
 
     /**
      * Required by {@link Externalizable}.
@@ -156,7 +155,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
      * @param cachesAff Affinity.
      * @return Message copy.
      */
-    GridDhtPartitionsFullMessage 
copyWithAffinity(Collection<CacheGroupAffinity> cachesAff) {
+    GridDhtPartitionsFullMessage 
copyWithAffinity(Collection<CacheGroupAffinityMessage> cachesAff) {
         assert !F.isEmpty(cachesAff) : cachesAff;
 
         GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
@@ -171,14 +170,14 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     /**
      * @return Affinity.
      */
-    @Nullable Collection<CacheGroupAffinity> cachesAffinity() {
+    @Nullable Collection<CacheGroupAffinityMessage> cachesAffinity() {
         return cachesAff;
     }
 
     /**
      * @param cachesAff Affinity.
      */
-    void cachesAffinity(Collection<CacheGroupAffinity> cachesAff) {
+    void cachesAffinity(Collection<CacheGroupAffinityMessage> cachesAff) {
         this.cachesAff = cachesAff;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index bb99266..23043d1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -1458,7 +1458,7 @@ public class CacheLateAffinityAssignmentTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void _testRandomOperations() throws Exception {
+    public void testRandomOperations() throws Exception {
         forceSrvMode = true;
 
         final int MAX_SRVS = 10;

Reply via email to