This is an automated email from the ASF dual-hosted git repository.

jokser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new afe7933  IGNITE-10799 Optimize affinity recalculation in case of node 
join or leave - Fixes #6242.
afe7933 is described below

commit afe7933b156d51691997fefd251b76de5ea15e1a
Author: Pavel Kovalenko <jokse...@gmail.com>
AuthorDate: Wed Apr 10 16:26:30 2019 +0300

    IGNITE-10799 Optimize affinity recalculation in case of node join or leave 
- Fixes #6242.
    
    Signed-off-by: Pavel Kovalenko <jokse...@gmail.com>
---
 .../processors/affinity/AffinityAssignment.java    |   5 +
 .../affinity/GridAffinityAssignment.java           |   5 +
 .../affinity/GridAffinityAssignmentCache.java      | 126 +++++--
 .../affinity/GridAffinityAssignmentV2.java         |  23 +-
 .../affinity/HistoryAffinityAssignmentImpl.java    |  10 +
 .../HistoryAffinityAssignmentShallowCopy.java      |   5 +
 .../affinity/IdealAffinityAssignment.java          | 148 +++++++++
 .../cache/CacheAffinitySharedManager.java          | 362 +++++++++++++++++----
 .../processors/cache/ExchangeDiscoveryEvents.java  |  33 +-
 .../processors/cache/GridCacheAffinityManager.java |   2 +-
 .../dht/preloader/CacheGroupAffinityMessage.java   |   2 +-
 .../preloader/GridDhtPartitionsExchangeFuture.java |   4 +-
 .../dht/topology/GridDhtPartitionTopologyImpl.java |   2 +-
 13 files changed, 616 insertions(+), 111 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
index 62adaa7..b8b1089 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
@@ -100,6 +100,11 @@ public interface AffinityAssignment {
     public Set<Integer> backupPartitions(UUID nodeId);
 
     /**
+     * @return Set of partitions which primary is different to primary in 
ideal assignment.
+     */
+    public Set<Integer> partitionPrimariesDifferentToIdeal();
+
+    /**
      * Converts List of Cluster Nodes to HashSet of UUIDs wrapped as 
unmodifiable collection.
      * @param assignmentPart Source assignment per partition.
      * @return List of deduplicated collections if ClusterNode's ids.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index 0d68226..8feb39e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -239,6 +239,11 @@ public class GridAffinityAssignment implements 
AffinityAssignment, Serializable
         return set == null ? Collections.<Integer>emptySet() : set;
     }
 
+    /** {@inheritDoc} */
+    @Override public Set<Integer> partitionPrimariesDifferentToIdeal() {
+        return Collections.emptySet();
+    }
+
     /**
      * Initializes primary and backup maps.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 0f468ac..95e26b8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -96,13 +96,13 @@ public class GridAffinityAssignmentCache {
     private final ConcurrentNavigableMap<AffinityTopologyVersion, 
HistoryAffinityAssignment> affCache;
 
     /** */
-    private List<List<ClusterNode>> idealAssignment;
+    private volatile IdealAffinityAssignment idealAssignment;
 
     /** */
-    private BaselineTopology baselineTopology;
+    private volatile IdealAffinityAssignment baselineAssignment;
 
     /** */
-    private List<List<ClusterNode>> baselineAssignment;
+    private BaselineTopology baselineTopology;
 
     /** Cache item corresponding to the head topology version. */
     private final AtomicReference<GridAffinityAssignmentV2> head;
@@ -208,7 +208,7 @@ public class GridAffinityAssignmentCache {
 
         assert idealAssignment != null;
 
-        GridAffinityAssignmentV2 assignment = new 
GridAffinityAssignmentV2(topVer, affAssignment, idealAssignment);
+        GridAffinityAssignmentV2 assignment = new 
GridAffinityAssignmentV2(topVer, affAssignment, idealAssignment.assignment());
 
         HistoryAffinityAssignmentImpl newHistEntry = new 
HistoryAffinityAssignmentImpl(assignment, backups);
 
@@ -238,14 +238,21 @@ public class GridAffinityAssignmentCache {
     /**
      * @param assignment Assignment.
      */
-    public void idealAssignment(List<List<ClusterNode>> assignment) {
-        this.idealAssignment = assignment;
+    public void idealAssignment(AffinityTopologyVersion topVer, 
List<List<ClusterNode>> assignment) {
+        this.idealAssignment = IdealAffinityAssignment.create(topVer, 
assignment);
     }
 
     /**
      * @return Assignment.
      */
-    @Nullable public List<List<ClusterNode>> idealAssignment() {
+    @Nullable public List<List<ClusterNode>> idealAssignmentRaw() {
+        return idealAssignment != null ? idealAssignment.assignment() : null;
+    }
+
+    /**
+     *
+     */
+    @Nullable public IdealAffinityAssignment idealAssignment() {
         return idealAssignment;
     }
 
@@ -284,23 +291,27 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
-     * Calculates affinity cache for given topology version.
+     * Calculates ideal assignment for given topology version and events 
happened since last calculation.
      *
      * @param topVer Topology version to calculate affinity cache for.
      * @param events Discovery events that caused this topology version change.
      * @param discoCache Discovery cache.
-     * @return Affinity assignments.
+     * @return Ideal affinity assignment.
      */
-    public List<List<ClusterNode>> calculate(
+    public IdealAffinityAssignment calculate(
         AffinityTopologyVersion topVer,
         @Nullable ExchangeDiscoveryEvents events,
         @Nullable DiscoCache discoCache
     ) {
         if (log.isDebugEnabled())
-            log.debug("Calculating affinity [topVer=" + topVer + ", 
locNodeId=" + ctx.localNodeId() +
+            log.debug("Calculating ideal affinity [topVer=" + topVer + ", 
locNodeId=" + ctx.localNodeId() +
                 ", discoEvts=" + events + ']');
 
-        List<List<ClusterNode>> prevAssignment = idealAssignment;
+        IdealAffinityAssignment prevAssignment = idealAssignment;
+
+        // Already calculated.
+        if (prevAssignment != null && 
prevAssignment.topologyVersion().equals(topVer))
+            return prevAssignment;
 
         // Resolve nodes snapshot for specified topology version.
         List<ClusterNode> sorted;
@@ -323,7 +334,7 @@ public class GridAffinityAssignmentCache {
                 
!discoCache.state().baselineTopology().equals(baselineTopology);
         }
 
-        List<List<ClusterNode>> assignment;
+        IdealAffinityAssignment assignment;
 
         if (prevAssignment != null && events != null) {
             /* Skip affinity calculation only when all nodes triggered exchange
@@ -343,23 +354,45 @@ public class GridAffinityAssignmentCache {
             if (skipCalculation)
                 assignment = prevAssignment;
             else if (hasBaseline && !changedBaseline) {
-                if (baselineAssignment == null)
-                    baselineAssignment = aff.assignPartitions(new 
GridAffinityFunctionContextImpl(
-                        
discoCache.state().baselineTopology().createBaselineView(sorted, nodeFilter),
-                        prevAssignment, events.lastEvent(), topVer, backups));
+                if (baselineAssignment == null) {
+                    List<ClusterNode> baselineAffinityNodes = 
discoCache.state().baselineTopology()
+                        .createBaselineView(sorted, nodeFilter);
+
+                    List<List<ClusterNode>> calculated = 
aff.assignPartitions(new GridAffinityFunctionContextImpl(
+                        baselineAffinityNodes, prevAssignment != null ? 
prevAssignment.assignment() : null,
+                        events.lastEvent(), topVer, backups));
+
+                    baselineAssignment = 
IdealAffinityAssignment.create(topVer, baselineAffinityNodes, calculated);
+                }
 
-                assignment = currentBaselineAssignment(topVer);
+                assignment = 
IdealAffinityAssignment.createWithPreservedPrimaries(
+                    topVer,
+                    baselineAssignmentWithoutOfflineNodes(topVer),
+                    baselineAssignment
+                );
             }
             else if (hasBaseline && changedBaseline) {
-                baselineAssignment = aff.assignPartitions(new 
GridAffinityFunctionContextImpl(
-                    
discoCache.state().baselineTopology().createBaselineView(sorted, nodeFilter),
-                    prevAssignment, events.lastEvent(), topVer, backups));
+                List<ClusterNode> baselineAffinityNodes = 
discoCache.state().baselineTopology()
+                    .createBaselineView(sorted, nodeFilter);
+
+                List<List<ClusterNode>> calculated = aff.assignPartitions(new 
GridAffinityFunctionContextImpl(
+                    baselineAffinityNodes, prevAssignment != null ? 
prevAssignment.assignment() : null,
+                    events.lastEvent(), topVer, backups));
 
-                assignment = currentBaselineAssignment(topVer);
+                baselineAssignment = IdealAffinityAssignment.create(topVer, 
baselineAffinityNodes, calculated);
+
+                assignment = 
IdealAffinityAssignment.createWithPreservedPrimaries(
+                    topVer,
+                    baselineAssignmentWithoutOfflineNodes(topVer),
+                    baselineAssignment
+                );
             }
             else {
-                assignment = aff.assignPartitions(new 
GridAffinityFunctionContextImpl(sorted, prevAssignment,
+                List<List<ClusterNode>> calculated = aff.assignPartitions(new 
GridAffinityFunctionContextImpl(sorted,
+                    prevAssignment != null ? prevAssignment.assignment() : 
null,
                     events.lastEvent(), topVer, backups));
+
+                assignment = IdealAffinityAssignment.create(topVer, sorted, 
calculated);
             }
         }
         else {
@@ -369,15 +402,27 @@ public class GridAffinityAssignmentCache {
                 event = events.lastEvent();
 
             if (hasBaseline) {
-                baselineAssignment = aff.assignPartitions(new 
GridAffinityFunctionContextImpl(
-                    
discoCache.state().baselineTopology().createBaselineView(sorted, nodeFilter),
-                    prevAssignment, event, topVer, backups));
+                List<ClusterNode> baselineAffinityNodes = 
discoCache.state().baselineTopology()
+                    .createBaselineView(sorted, nodeFilter);
+
+                List<List<ClusterNode>> calculated = aff.assignPartitions(new 
GridAffinityFunctionContextImpl(
+                    baselineAffinityNodes, prevAssignment != null ? 
prevAssignment.assignment() : null,
+                    event, topVer, backups));
+
+                baselineAssignment = IdealAffinityAssignment.create(topVer, 
baselineAffinityNodes, calculated);
 
-                assignment = currentBaselineAssignment(topVer);
+                assignment = 
IdealAffinityAssignment.createWithPreservedPrimaries(
+                    topVer,
+                    baselineAssignmentWithoutOfflineNodes(topVer),
+                    baselineAssignment
+                );
             }
             else {
-                assignment = aff.assignPartitions(new 
GridAffinityFunctionContextImpl(sorted, prevAssignment,
+                List<List<ClusterNode>> calculated = aff.assignPartitions(new 
GridAffinityFunctionContextImpl(sorted,
+                    prevAssignment != null ? prevAssignment.assignment() : 
null,
                     event, topVer, backups));
+
+                assignment = IdealAffinityAssignment.create(topVer, sorted, 
calculated);
             }
         }
 
@@ -386,7 +431,7 @@ public class GridAffinityAssignmentCache {
         idealAssignment = assignment;
 
         if (ctx.cache().cacheMode(cacheOrGrpName) == PARTITIONED && 
!ctx.clientNode())
-            printDistributionIfThresholdExceeded(assignment, sorted.size());
+            printDistributionIfThresholdExceeded(assignment.assignment(), 
sorted.size());
 
         if (hasBaseline) {
             baselineTopology = discoCache.state().baselineTopology();
@@ -398,7 +443,7 @@ public class GridAffinityAssignmentCache {
         }
 
         if (locCache)
-            initialize(topVer, assignment);
+            initialize(topVer, assignment.assignment());
 
         return assignment;
     }
@@ -407,7 +452,7 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version.
      * @return Baseline assignment with filtered out offline nodes.
      */
-    private List<List<ClusterNode>> 
currentBaselineAssignment(AffinityTopologyVersion topVer) {
+    private List<List<ClusterNode>> 
baselineAssignmentWithoutOfflineNodes(AffinityTopologyVersion topVer) {
         Map<Object, ClusterNode> alives = new HashMap<>();
 
         for (ClusterNode node : ctx.discovery().nodes(topVer)) {
@@ -415,10 +460,12 @@ public class GridAffinityAssignmentCache {
                 alives.put(node.consistentId(), node);
         }
 
-        List<List<ClusterNode>> result = new 
ArrayList<>(baselineAssignment.size());
+        List<List<ClusterNode>> assignment = baselineAssignment.assignment();
 
-        for (int p = 0; p < baselineAssignment.size(); p++) {
-            List<ClusterNode> baselineMapping = baselineAssignment.get(p);
+        List<List<ClusterNode>> result = new ArrayList<>(assignment.size());
+
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> baselineMapping = assignment.get(p);
             List<ClusterNode> currentMapping = null;
 
             for (ClusterNode node : baselineMapping) {
@@ -604,6 +651,13 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
+     * @param topVer Topology version.
+     */
+    public Set<Integer> 
partitionPrimariesDifferentToIdeal(AffinityTopologyVersion topVer) {
+        return cachedAffinity(topVer).partitionPrimariesDifferentToIdeal();
+    }
+
+    /**
      * Get primary partitions for specified node ID.
      *
      * @param nodeId Node ID to get primary partitions for.
@@ -789,9 +843,9 @@ public class GridAffinityAssignmentCache {
      */
     public void init(GridAffinityAssignmentCache aff) {
         assert aff.lastVersion().compareTo(lastVersion()) >= 0;
-        assert aff.idealAssignment() != null;
+        assert aff.idealAssignmentRaw() != null;
 
-        idealAssignment(aff.idealAssignment());
+        idealAssignment(aff.lastVersion(), aff.idealAssignmentRaw());
 
         AffinityAssignment assign = aff.cachedAffinity(aff.lastVersion());
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2.java
index 4a8f9a4..baee696 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2.java
@@ -57,6 +57,9 @@ public class GridAffinityAssignmentV2 extends 
IgniteDataTransferObject implement
     /** Map of backup node partitions. */
     private Map<UUID, Set<Integer>> backup;
 
+    /** Set of partitions which primary is different than in ideal assignment. 
*/
+    private Set<Integer> primariesDifferentToIdeal;
+
     /** Assignment node IDs */
     private transient volatile List<Collection<UUID>> assignmentIds;
 
@@ -109,12 +112,15 @@ public class GridAffinityAssignmentV2 extends 
IgniteDataTransferObject implement
         // Temporary mirrors with modifiable partition's collections.
         Map<UUID, Set<Integer>> tmpPrimary = new HashMap<>();
         Map<UUID, Set<Integer>> tmpBackup = new HashMap<>();
+        Set<Integer> primariesDifferentToIdeal = new HashSet<>();
         boolean isPrimary;
 
         for (int partsCnt = assignment.size(), p = 0; p < partsCnt; p++) {
             isPrimary = true;
 
-            for (ClusterNode node : assignment.get(p)) {
+            List<ClusterNode> currentOwners = assignment.get(p);
+
+            for (ClusterNode node : currentOwners) {
                 UUID id = node.id();
 
                 Map<UUID, Set<Integer>> tmp = isPrimary ? tmpPrimary : 
tmpBackup;
@@ -130,10 +136,19 @@ public class GridAffinityAssignmentV2 extends 
IgniteDataTransferObject implement
 
                 isPrimary =  false;
             }
+
+            List<ClusterNode> idealOwners = p < idealAssignment.size() ? 
idealAssignment.get(p) : Collections.emptyList();
+
+            ClusterNode curPrimary = !currentOwners.isEmpty() ? 
currentOwners.get(0) : null;
+            ClusterNode idealPrimary = !idealOwners.isEmpty() ? 
idealOwners.get(0) : null;
+
+            if (curPrimary != null && !curPrimary.equals(idealPrimary))
+                primariesDifferentToIdeal.add(p);
         }
 
         primary = Collections.unmodifiableMap(tmpPrimary);
         backup = Collections.unmodifiableMap(tmpBackup);
+        this.primariesDifferentToIdeal = 
Collections.unmodifiableSet(primariesDifferentToIdeal);
     }
 
     /**
@@ -147,6 +162,7 @@ public class GridAffinityAssignmentV2 extends 
IgniteDataTransferObject implement
         idealAssignment = aff.idealAssignment;
         primary = aff.primary;
         backup = aff.backup;
+        primariesDifferentToIdeal = aff.primariesDifferentToIdeal;
     }
 
     /**
@@ -289,6 +305,11 @@ public class GridAffinityAssignmentV2 extends 
IgniteDataTransferObject implement
     }
 
     /** {@inheritDoc} */
+    @Override public Set<Integer> partitionPrimariesDifferentToIdeal() {
+        return Collections.unmodifiableSet(primariesDifferentToIdeal);
+    }
+
+    /** {@inheritDoc} */
     @Override public int hashCode() {
         return topVer.hashCode();
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java
index 1eda706..1123756 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java
@@ -56,6 +56,9 @@ public class HistoryAffinityAssignmentImpl implements 
HistoryAffinityAssignment
     /** Diff with ideal. */
     private final Map<Integer, char[]> assignmentDiff;
 
+    /** Partition primaries different to ideal. */
+    private final Set<Integer> partitionPrimariesDifferentToIdeal;
+
     /**
      * @param assign Assignment.
      * @param backups Backups.
@@ -63,6 +66,8 @@ public class HistoryAffinityAssignmentImpl implements 
HistoryAffinityAssignment
     public HistoryAffinityAssignmentImpl(AffinityAssignment assign, int 
backups) {
         topVer = assign.topologyVersion();
 
+        partitionPrimariesDifferentToIdeal = 
assign.partitionPrimariesDifferentToIdeal();
+
         if (IGNITE_DISABLE_AFFINITY_MEMORY_OPTIMIZATION || backups > 
IGNITE_AFFINITY_BACKUPS_THRESHOLD) {
             assignment = assign.assignment();
 
@@ -320,6 +325,11 @@ public class HistoryAffinityAssignmentImpl implements 
HistoryAffinityAssignment
     }
 
     /** {@inheritDoc} */
+    @Override public Set<Integer> partitionPrimariesDifferentToIdeal() {
+        return Collections.unmodifiableSet(partitionPrimariesDifferentToIdeal);
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean requiresHistoryCleanup() {
         return true;
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java
index 4fcea72..ac9bfa7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java
@@ -96,6 +96,11 @@ public class HistoryAffinityAssignmentShallowCopy implements 
HistoryAffinityAssi
     }
 
     /** {@inheritDoc} */
+    @Override public Set<Integer> partitionPrimariesDifferentToIdeal() {
+        return histAssignment.partitionPrimariesDifferentToIdeal();
+    }
+
+    /** {@inheritDoc} */
     @Override public HistoryAffinityAssignment origin() {
         return histAssignment;
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/IdealAffinityAssignment.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/IdealAffinityAssignment.java
new file mode 100644
index 0000000..c930ec5
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/IdealAffinityAssignment.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class IdealAffinityAssignment {
+    /** Topology version. */
+    private final AffinityTopologyVersion topologyVersion;
+
+    /** Assignment. */
+    private final List<List<ClusterNode>> assignment;
+
+    /** Ideal primaries. */
+    private final Map<Object, Set<Integer>> idealPrimaries;
+
+    /**
+     * @param topologyVersion Topology version.
+     * @param assignment Assignment.
+     * @param idealPrimaries Ideal primaries.
+     */
+    private IdealAffinityAssignment(
+        AffinityTopologyVersion topologyVersion,
+        List<List<ClusterNode>> assignment,
+        Map<Object, Set<Integer>> idealPrimaries
+    ) {
+        this.topologyVersion = topologyVersion;
+        this.assignment = assignment;
+        this.idealPrimaries = idealPrimaries;
+    }
+
+    /**
+     * @param clusterNode Cluster node.
+     */
+    public Set<Integer> idealPrimaries(ClusterNode clusterNode) {
+        Object consistentId = clusterNode.consistentId();
+
+        assert consistentId != null : clusterNode;
+
+        return idealPrimaries.getOrDefault(consistentId, 
Collections.emptySet());
+    }
+
+    /**
+     * @param partition Partition.
+     */
+    public ClusterNode currentPrimary(int partition) {
+        return assignment.get(partition).get(0);
+    }
+
+    /**
+     *
+     */
+    public List<List<ClusterNode>> assignment() {
+        return assignment;
+    }
+
+    /**
+     *
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topologyVersion;
+    }
+
+    /**
+     * @param nodes Nodes.
+     * @param assignment Assignment.
+     */
+    private static Map<Object, Set<Integer>> calculatePrimaries(
+        @Nullable List<ClusterNode> nodes,
+        List<List<ClusterNode>> assignment
+    ) {
+        int nodesSize = nodes != null ? nodes.size() : 100;
+
+        Map<Object, Set<Integer>> primaryPartitions = U.newHashMap(nodesSize);
+
+        for (int size = assignment.size(), p = 0; p < size; p++) {
+            List<ClusterNode> affinityNodes = assignment.get(p);
+
+            if (!affinityNodes.isEmpty()) {
+                ClusterNode primary = affinityNodes.get(0);
+
+                primaryPartitions.computeIfAbsent(primary.consistentId(),
+                    id -> new HashSet<>(U.capacity(size / nodesSize * 
2))).add(p);
+            }
+        }
+
+        return primaryPartitions;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param assignment Assignment.
+     */
+    public static IdealAffinityAssignment create(AffinityTopologyVersion 
topVer, List<List<ClusterNode>> assignment) {
+        return create(topVer, null, assignment);
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param nodes Nodes.
+     * @param assignment Assignment.
+     */
+    public static IdealAffinityAssignment create(
+        AffinityTopologyVersion topVer,
+        @Nullable List<ClusterNode> nodes,
+        List<List<ClusterNode>> assignment
+    ) {
+        return new IdealAffinityAssignment(topVer, assignment, 
calculatePrimaries(nodes, assignment));
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param assignment Assignment.
+     * @param previousAssignment Previous assignment.
+     */
+    public static IdealAffinityAssignment createWithPreservedPrimaries(
+        AffinityTopologyVersion topVer,
+        List<List<ClusterNode>> assignment,
+        IdealAffinityAssignment previousAssignment
+    ) {
+        return new IdealAffinityAssignment(topVer, assignment, 
previousAssignment.idealPrimaries);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index d07012e..203ffb7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -50,6 +51,7 @@ import 
org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.affinity.IdealAffinityAssignment;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.ClientCacheDhtTopologyFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture;
@@ -469,7 +471,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
                 CacheGroupHolder grpHolder = grpHolders.get(grp.groupId());
 
-                assert !crd || (grpHolder != null && 
grpHolder.affinity().idealAssignment() != null);
+                assert !crd || (grpHolder != null && 
grpHolder.affinity().idealAssignmentRaw() != null);
 
                 if (grpHolder == null)
                     grpHolder = getOrCreateGroupHolder(topVer, grpDesc);
@@ -1097,7 +1099,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         forAllCacheGroups(crd, new 
IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) 
throws IgniteCheckedException {
-                List<List<ClusterNode>> idealAssignment = 
aff.idealAssignment();
+                List<List<ClusterNode>> idealAssignment = 
aff.idealAssignmentRaw();
 
                 assert idealAssignment != null;
 
@@ -1114,7 +1116,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 else
                     newAssignment = idealAssignment;
 
-                aff.initialize(topVer, cachedAssignment(aff, newAssignment, 
affCache));
+                aff.initialize(topVer, newAssignment);
 
                 exchFut.timeBag().finishLocalStage("Affinity recalculate by 
change affinity message " +
                     "[grp=" + aff.cacheOrGroupName() + "]");
@@ -1196,7 +1198,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         assignment.set(part, nodes);
                     }
 
-                    aff.initialize(topVer, cachedAssignment(aff, assignment, 
affCache));
+                    aff.initialize(topVer, assignment);
                 }
                 else
                     aff.clientEventTopologyChange(exchFut.firstEvent(), 
topVer);
@@ -1340,7 +1342,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         CacheGroupContext grp = cctx.kernalContext().cache().cacheGroup(grpId);
 
         if (grpHolder != null && grpHolder.nonAffNode() && grp != null) {
-            assert grpHolder.affinity().idealAssignment() != null;
+            assert grpHolder.affinity().idealAssignmentRaw() != null;
 
             grpHolder = new CacheGroupAffNodeHolder(grp, grpHolder.affinity());
 
@@ -1471,15 +1473,13 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         // Please do not use following pattern of code (nodesByOrder, 
affCache). NEVER.
         final Map<Long, ClusterNode> nodesByOrder = new ConcurrentHashMap<>();
 
-        final Map<Object, List<List<ClusterNode>>> affCache = new 
ConcurrentHashMap<>();
-
         forAllCacheGroups(false, new 
IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) 
throws IgniteCheckedException {
                 ExchangeDiscoveryEvents evts = fut.context().events();
 
                 Map<Integer, CacheGroupAffinityMessage> idealAffDiff = 
msg.idealAffinityDiff();
 
-                List<List<ClusterNode>> idealAssignment = 
aff.calculate(evts.topologyVersion(), evts, evts.discoveryCache());
+                List<List<ClusterNode>> idealAssignment = 
aff.calculate(evts.topologyVersion(), evts, evts.discoveryCache()).assignment();
 
                 CacheGroupAffinityMessage affMsg = idealAffDiff != null ? 
idealAffDiff.get(aff.groupId()) : null;
 
@@ -1503,7 +1503,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 else
                     newAssignment = idealAssignment;
 
-                aff.initialize(evts.topologyVersion(), cachedAssignment(aff, 
newAssignment, affCache));
+                aff.initialize(evts.topologyVersion(), newAssignment);
 
                 fut.timeBag().finishLocalStage("Affinity applying from full 
message " +
                     "[grp=" + aff.cacheOrGroupName() + "]");
@@ -1559,7 +1559,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         affMsg.createIdealAssignments(nodesByOrder, 
evts.discoveryCache());
 
                     if (idealAssign != null)
-                        aff.idealAssignment(idealAssign);
+                        aff.idealAssignment(evts.topologyVersion(), 
idealAssign);
                     else {
                         assert !aff.centralizedAffinityFunction() : aff;
 
@@ -1620,9 +1620,238 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         assert fut.context().mergeExchanges();
         assert evts.hasServerLeft();
 
-        Map<Integer, CacheGroupAffinityMessage> result = 
onReassignmentEnforced(fut);
+        if (evts.hasServerLeft() && evts.hasServerJoin())
+            return onReassignmentEnforced(fut);
+        else
+            return onServerLeftWithExchangeMergeProtocolLightweight(fut);
+    }
 
-        return result;
+    /**
+     * @param fut Current exchange future.
+     * @return Computed difference with ideal affinity.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Map<Integer, CacheGroupAffinityMessage> 
onServerLeftWithExchangeMergeProtocolLightweight(
+        final GridDhtPartitionsExchangeFuture fut) throws 
IgniteCheckedException
+    {
+        final ExchangeDiscoveryEvents evts = fut.context().events();
+        final AffinityTopologyVersion topVer = evts.topologyVersion();
+
+        assert fut.context().mergeExchanges();
+        assert evts.hasServerLeft();
+
+        final WaitRebalanceInfo waitRebalanceInfo =
+            new 
WaitRebalanceInfo(fut.context().events().lastServerEventVersion());
+
+        final Map<Integer, Map<Integer, List<Long>>> diff = new 
ConcurrentHashMap<>();
+
+        final Set<ClusterNode> aliveNodes = new 
HashSet<>(fut.context().events().discoveryCache().serverNodes());
+
+        forAllRegisteredCacheGroups(new 
IgniteInClosureX<CacheGroupDescriptor>() {
+            @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
+                AffinityTopologyVersion topVer = evts.topologyVersion();
+
+                CacheGroupHolder grpHolder = getOrCreateGroupHolder(topVer, 
desc);
+
+                IdealAffinityAssignment idealAssignment = 
grpHolder.affinity().calculate(topVer, evts, evts.discoveryCache());
+
+                if (!grpHolder.rebalanceEnabled || 
fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom())) {
+                    grpHolder.affinity().initialize(topVer, 
idealAssignment.assignment());
+
+                    return;
+                }
+
+                AffinityTopologyVersion affTopVer = 
grpHolder.affinity().lastVersion();
+
+                List<List<ClusterNode>> curAssignment = 
grpHolder.affinity().assignments(affTopVer);
+
+                assert curAssignment != null;
+
+                List<List<ClusterNode>> newAssignment = new 
ArrayList<>(idealAssignment.assignment());
+
+                GridDhtPartitionTopology top = 
grpHolder.topology(fut.context().events().discoveryCache());
+
+                BitSet processedPartitions = new BitSet(curAssignment.size());
+
+                Map<Integer, List<Long>> cacheAffinityDiff = new HashMap<>();
+
+                for (ClusterNode leftNode : evts.leftServerNodes()) {
+                    for (int p : idealAssignment.idealPrimaries(leftNode)) {
+                        List<ClusterNode> curOwners = curAssignment.get(p);
+
+                        if (curOwners.isEmpty())
+                            continue;
+
+                        List<ClusterNode> idealOwners = 
idealAssignment.assignment().get(p);
+
+                        List<ClusterNode> newOwners = null;
+
+                        if (idealOwners.isEmpty())
+                            newOwners = selectCurrentAliveOwners(aliveNodes, 
curOwners);
+                        else if (curOwners.get(0).equals(leftNode))
+                            newOwners = 
selectPrimaryTopologyOwnerFromIdealAssignment(
+                                grpHolder.aff,
+                                p,
+                                top,
+                                idealOwners,
+                                waitRebalanceInfo
+                            );
+                        else if (!curOwners.get(0).equals(idealOwners.get(0)))
+                            newOwners = latePrimaryAssignment(
+                                grpHolder.aff,
+                                p,
+                                curOwners.get(0),
+                                idealOwners,
+                                waitRebalanceInfo
+                            );
+
+                        if (newOwners != null) {
+                            newAssignment.set(p, newOwners);
+
+                            List<Long> clusterNodesAsOrder = newOwners.stream()
+                                .map(NODE_TO_ORDER::apply)
+                                .collect(Collectors.toList());
+
+                            cacheAffinityDiff.put(p, clusterNodesAsOrder);
+
+                            processedPartitions.set(p);
+                        }
+                    }
+                }
+
+                Set<Integer> partitionsWithChangedPrimary = 
grpHolder.affinity().partitionPrimariesDifferentToIdeal(affTopVer);
+
+                // We need to re-check partitions for further correct late 
affinity assignment
+                // where primary node is not as in ideal assignment.
+                for (int p : partitionsWithChangedPrimary) {
+                    if (processedPartitions.get(p))
+                        continue;
+
+                    List<ClusterNode> curOwners = curAssignment.get(p);
+
+                    if (curOwners.isEmpty())
+                        continue;
+
+                    List<ClusterNode> idealOwners = 
idealAssignment.assignment().get(p);
+
+                    List<ClusterNode> newOwners = null;
+
+                    if (idealOwners.isEmpty())
+                        newOwners = selectCurrentAliveOwners(aliveNodes, 
curOwners);
+                    else if (!aliveNodes.contains(curOwners.get(0)))
+                        newOwners = 
selectPrimaryTopologyOwnerFromIdealAssignment(
+                            grpHolder.aff,
+                            p,
+                            top,
+                            idealOwners,
+                            waitRebalanceInfo
+                        );
+                    else if (!curOwners.get(0).equals(idealOwners.get(0)))
+                        // Current distribution was already not ideal. 
Preserve it for late affinity assignment.
+                        newOwners = latePrimaryAssignment(
+                            grpHolder.aff,
+                            p,
+                            curOwners.get(0),
+                            idealOwners,
+                            waitRebalanceInfo
+                        );
+
+                    if (newOwners != null) {
+                        newAssignment.set(p, newOwners);
+
+                        List<Long> clusterNodesAsOrder = newOwners.stream()
+                            .map(NODE_TO_ORDER::apply)
+                            .collect(Collectors.toList());
+
+                        cacheAffinityDiff.put(p, clusterNodesAsOrder);
+                    }
+                }
+
+                if (!cacheAffinityDiff.isEmpty())
+                    diff.put(grpHolder.groupId(), cacheAffinityDiff);
+
+                grpHolder.affinity().initialize(topVer, newAssignment);
+
+                fut.timeBag().finishLocalStage("Affinity initialization (on 
server left) " +
+                    "[grp=" + desc.cacheOrGroupName() + "]");
+            }
+        });
+
+        synchronized (mux) {
+            this.waitInfo = !waitRebalanceInfo.empty() ? waitRebalanceInfo : 
null;
+
+            WaitRebalanceInfo info = this.waitInfo;
+
+            if (log.isDebugEnabled()) {
+                log.debug("Computed new affinity after node left [topVer=" + 
topVer +
+                    ", waitGrps=" + (info != null ? 
groupNames(info.waitGrps.keySet()) : null) + ']');
+            }
+        }
+
+        return CacheGroupAffinityMessage.createAffinityDiffMessages(diff);
+    }
+
+    /**
+     * Selects current alive owners for some partition as affinity 
distribution.
+     *
+     * @param aliveNodes Alive cluster nodes.
+     * @param curOwners  Current affinity owners for some partition.
+     *
+     * @return List of current alive affinity owners.
+     *         {@code null} if affinity owners should be inherited from ideal 
assignment as is.
+     */
+    private @Nullable List<ClusterNode> selectCurrentAliveOwners(
+        Set<ClusterNode> aliveNodes,
+        List<ClusterNode> curOwners
+    ) {
+        List<ClusterNode> aliveCurOwners = 
curOwners.stream().filter(aliveNodes::contains).collect(Collectors.toList());
+
+        return !aliveCurOwners.isEmpty() ? aliveCurOwners : null;
+    }
+
+    /**
+     * Selects a node from ideal assignment that holds {@code OWNING} status 
for given partition as affinity primary.
+     * Other nodes from ideal assignment are selected as backups.
+     *
+     * @param aff               Affinity assignment cache.
+     * @param partition         Partition number.
+     * @param topology          Partition topology for cache.
+     * @param idealOwners       Ideal affinity distribution for given 
partition.
+     * @param waitRebalanceInfo Wait rebalance info for late affinity 
assignment.
+
+     * @return List of affinity owners where first node is primary and holds 
{@code OWNING} partition status.
+     *         {@code null} if affinity owners should be inherited from ideal 
assignment as is.
+     */
+    private @Nullable List<ClusterNode> 
selectPrimaryTopologyOwnerFromIdealAssignment(
+        GridAffinityAssignmentCache aff,
+        int partition,
+        GridDhtPartitionTopology topology,
+        List<ClusterNode> idealOwners,
+        WaitRebalanceInfo waitRebalanceInfo
+    ) {
+        ClusterNode newPrimary = idealOwners.get(0);
+
+        if (topology.partitionState(newPrimary.id(), partition) != 
GridDhtPartitionState.OWNING) {
+            for (ClusterNode node : idealOwners) {
+                if (topology.partitionState(node.id(), partition) == 
GridDhtPartitionState.OWNING) {
+                    newPrimary = node;
+
+                    break;
+                }
+            }
+        }
+
+        // If primary by ideal assignment is already topology owner, no need 
to change affinity for that partition.
+        if (newPrimary.equals(idealOwners.get(0)))
+            return null;
+
+        // In other case re-select primary with late affinity assignment.
+        return latePrimaryAssignment(
+            aff,
+            partition,
+            newPrimary,
+            idealOwners,
+            waitRebalanceInfo);
     }
 
     /**
@@ -1660,7 +1889,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
                 CacheGroupHolder cache = getOrCreateGroupHolder(topVer, desc);
 
-                List<List<ClusterNode>> assign = 
cache.affinity().calculate(topVer, evts, evts.discoveryCache());
+                List<List<ClusterNode>> assign = 
cache.affinity().calculate(topVer, evts, evts.discoveryCache()).assignment();
 
                 if (!cache.rebalanceEnabled || 
fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()))
                     cache.affinity().initialize(topVer, assign);
@@ -1795,7 +2024,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         GridAffinityAssignmentCache aff,
         AffinityTopologyVersion topVer)
     {
-        List<List<ClusterNode>> assignment = aff.calculate(topVer, evts, 
evts.discoveryCache());
+        List<List<ClusterNode>> assignment = aff.calculate(topVer, evts, 
evts.discoveryCache()).assignment();
 
         aff.initialize(topVer, assignment);
     }
@@ -1880,7 +2109,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         GridDhtAffinityAssignmentResponse res = fetchFut.get();
 
         if (res == null) {
-            List<List<ClusterNode>> aff = affCache.calculate(topVer, events, 
discoCache);
+            List<List<ClusterNode>> aff = affCache.calculate(topVer, events, 
discoCache).assignment();
 
             affCache.initialize(topVer, aff);
         }
@@ -1888,7 +2117,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             List<List<ClusterNode>> idealAff = 
res.idealAffinityAssignment(discoCache);
 
             if (idealAff != null)
-                affCache.idealAssignment(idealAff);
+                affCache.idealAssignment(topVer, idealAff);
             else {
                 assert !affCache.centralizedAffinityFunction();
 
@@ -1960,7 +2189,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
                 CacheGroupHolder grpHolder = getOrCreateGroupHolder(topVer, 
desc);
 
-                if (grpHolder.affinity().idealAssignment() != null)
+                if (grpHolder.affinity().idealAssignmentRaw() != null)
                     return;
 
                 // Need initialize holders and affinity if this node became 
coordinator during this exchange.
@@ -2139,8 +2368,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final 
GridDhtPartitionsExchangeFuture fut, boolean crd) {
         final ExchangeDiscoveryEvents evts = fut.context().events();
 
-        final Map<Object, List<List<ClusterNode>>> affCache = new 
ConcurrentHashMap<>();
-
         final WaitRebalanceInfo waitRebalanceInfo = new 
WaitRebalanceInfo(evts.lastServerEventVersion());
 
         forAllRegisteredCacheGroups(new 
IgniteInClosureX<CacheGroupDescriptor>() {
@@ -2158,8 +2385,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     grpAdded,
                     cache.affinity(),
                     crd ? waitRebalanceInfo : null,
-                    latePrimary,
-                    affCache);
+                    latePrimary);
 
                 if (crd && grpAdded) {
                     AffinityAssignment aff = 
cache.aff.cachedAffinity(cache.aff.lastVersion());
@@ -2220,15 +2446,13 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @param aff Affinity.
      * @param rebalanceInfo Rebalance information.
      * @param latePrimary If {@code true} delays primary assignment if it is 
not owner.
-     * @param affCache Already calculated assignments (to reduce data stored 
in history).
      */
     private void initAffinityOnNodeJoin(
         ExchangeDiscoveryEvents evts,
         boolean addedOnExchnage,
         GridAffinityAssignmentCache aff,
         WaitRebalanceInfo rebalanceInfo,
-        boolean latePrimary,
-        Map<Object, List<List<ClusterNode>>> affCache
+        boolean latePrimary
     ) {
         if (addedOnExchnage) {
             if (!aff.lastVersion().equals(evts.topologyVersion()))
@@ -2242,61 +2466,79 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized 
[grp=" + aff.cacheOrGroupName() +
             ", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']';
 
-        List<List<ClusterNode>> curAff = aff.assignments(affTopVer);
+        assert aff.idealAssignmentRaw() != null : "Previous assignment is not 
available.";
 
-        assert aff.idealAssignment() != null : "Previous assignment is not 
available.";
-
-        List<List<ClusterNode>> idealAssignment = 
aff.calculate(evts.topologyVersion(), evts, evts.discoveryCache());
+        IdealAffinityAssignment idealAssignment = 
aff.calculate(evts.topologyVersion(), evts, evts.discoveryCache());
+        List<List<ClusterNode>> curAssignment = aff.assignments(affTopVer);
         List<List<ClusterNode>> newAssignment = null;
 
         if (latePrimary) {
-            for (int p = 0; p < idealAssignment.size(); p++) {
-                List<ClusterNode> newNodes = idealAssignment.get(p);
-                List<ClusterNode> curNodes = curAff.get(p);
+            BitSet processedPartitions = new BitSet(curAssignment.size());
+
+            // Late affinity assignment to changed primaries.
+            for (ClusterNode joinedNode : evts.joinedServerNodes()) {
+                Set<Integer> primaries = 
idealAssignment.idealPrimaries(joinedNode);
 
-                ClusterNode curPrimary = !curNodes.isEmpty() ? curNodes.get(0) 
: null;
-                ClusterNode newPrimary = !newNodes.isEmpty() ? newNodes.get(0) 
: null;
+                for (int p : primaries) {
+                    List<ClusterNode> curNodes = curAssignment.get(p);
+
+                    if (curNodes.isEmpty())
+                        continue;
+
+                    ClusterNode curPrimary = curNodes.get(0);
 
-                if (curPrimary != null && newPrimary != null && 
!curPrimary.equals(newPrimary)) {
                     assert cctx.discovery().node(evts.topologyVersion(), 
curPrimary.id()) != null : curPrimary;
 
-                    List<ClusterNode> nodes0 = latePrimaryAssignment(aff,
+                    List<ClusterNode> idealNodes = 
idealAssignment.assignment().get(p);
+
+                    List<ClusterNode> newNodes = latePrimaryAssignment(aff,
                         p,
                         curPrimary,
-                        newNodes,
+                        idealNodes,
                         rebalanceInfo);
 
                     if (newAssignment == null)
-                        newAssignment = new ArrayList<>(idealAssignment);
+                        newAssignment = new 
ArrayList<>(idealAssignment.assignment());
+
+                    newAssignment.set(p, newNodes);
 
-                    newAssignment.set(p, nodes0);
+                    processedPartitions.set(p);
                 }
             }
-        }
 
-        if (newAssignment == null)
-            newAssignment = idealAssignment;
+            Set<Integer> partitionsWithChangedPrimary = 
aff.partitionPrimariesDifferentToIdeal(affTopVer);
 
-        aff.initialize(evts.topologyVersion(), cachedAssignment(aff, 
newAssignment, affCache));
-    }
+            for (int p : partitionsWithChangedPrimary) {
+                // Already processed above.
+                if (processedPartitions.get(p))
+                    continue;
 
-    /**
-     * @param aff Assignment cache.
-     * @param assign Assignment.
-     * @param affCache Assignments already calculated for other caches.
-     * @return Assignment.
-     */
-    private List<List<ClusterNode>> 
cachedAssignment(GridAffinityAssignmentCache aff,
-        List<List<ClusterNode>> assign,
-        Map<Object, List<List<ClusterNode>>> affCache) {
-        List<List<ClusterNode>> assign0 = 
affCache.get(aff.similarAffinityKey());
+                List<ClusterNode> curNodes = curAssignment.get(p);
 
-        if (assign0 != null && assign0.equals(assign))
-            assign = assign0;
-        else
-            affCache.put(aff.similarAffinityKey(), assign);
+                if (curNodes.isEmpty())
+                    continue;
+
+                List<ClusterNode> idealOwners = 
idealAssignment.assignment().get(p);
+
+                if (!curNodes.get(0).equals(idealOwners.get(0))) {
+                    List<ClusterNode> newNodes = latePrimaryAssignment(aff,
+                        p,
+                        curNodes.get(0),
+                        idealOwners,
+                        rebalanceInfo);
+
+                    if (newAssignment == null)
+                        newAssignment = new 
ArrayList<>(idealAssignment.assignment());
+
+                    newAssignment.set(p, newNodes);
+                }
+            }
+        }
+
+        if (newAssignment == null)
+            newAssignment = idealAssignment.assignment();
 
-        return assign;
+        aff.initialize(evts.topologyVersion(), newAssignment);
     }
 
     /**
@@ -2407,7 +2649,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     "Invalid affinity version [last=" + affTopVer + ", 
futVer=" + topVer + ", grp=" + desc.cacheOrGroupName() + ']';
 
                 List<List<ClusterNode>> curAssignment = 
grpHolder.affinity().assignments(affTopVer);
-                List<List<ClusterNode>> newAssignment = 
grpHolder.affinity().idealAssignment();
+                List<List<ClusterNode>> newAssignment = 
grpHolder.affinity().idealAssignmentRaw();
 
                 assert newAssignment != null;
 
@@ -2436,7 +2678,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         ", node=" + newPrimary +
                         ", topVer=" + topVer + ']';
 
-                    List<ClusterNode> owners = top.owners(p);
+                    List<ClusterNode> owners = top.owners(p, topVer);
 
                     // It is essential that curPrimary node has partition in 
OWNING state.
                     if (!owners.isEmpty() && !owners.contains(curPrimary))
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
index 23df8d4..b71abbd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.IgniteLogger;
@@ -57,13 +58,13 @@ public class ExchangeDiscoveryEvents {
     private DiscoveryEvent lastSrvEvt;
 
     /** All events. */
-    private List<DiscoveryEvent> evts = new ArrayList<>();
+    private List<DiscoveryEvent> evts = Collections.synchronizedList(new 
ArrayList<>());
 
-    /** Server join flag. */
-    private boolean srvJoin;
+    /** Joined server nodes. */
+    private List<ClusterNode> joinedSrvNodes = 
Collections.synchronizedList(new ArrayList<>());
 
-    /** Sever left flag. */
-    private boolean srvLeft;
+    /** Left server nodes. */
+    private List<ClusterNode> leftSrvNodes = Collections.synchronizedList(new 
ArrayList<>());
 
     /**
      * @param fut Current exchange future.
@@ -126,9 +127,9 @@ public class ExchangeDiscoveryEvents {
             srvEvtTopVer = new AffinityTopologyVersion(evt.topologyVersion(), 
0);
 
             if (evt.type()== EVT_NODE_JOINED)
-                srvJoin = true;
+                joinedSrvNodes.add(evt.eventNode());
             else if (evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED)
-                srvLeft = !node.isClient();
+                leftSrvNodes.add(evt.eventNode());
         }
     }
 
@@ -180,14 +181,28 @@ public class ExchangeDiscoveryEvents {
      * @return {@code True} if has event for server join.
      */
     public boolean hasServerJoin() {
-        return srvJoin;
+        return !joinedSrvNodes.isEmpty();
     }
 
     /**
      * @return {@code True} if has event for server leave.
      */
     public boolean hasServerLeft() {
-        return srvLeft;
+        return !leftSrvNodes.isEmpty();
+    }
+
+    /**
+     *
+     */
+    public List<ClusterNode> joinedServerNodes() {
+        return joinedSrvNodes;
+    }
+
+    /**
+     *
+     */
+    public List<ClusterNode> leftServerNodes() {
+        return leftSrvNodes;
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 1315c67..855923f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -137,7 +137,7 @@ public class GridCacheAffinityManager extends 
GridCacheManagerAdapter {
     public List<List<ClusterNode>> idealAssignment() {
         assert !cctx.isLocal();
 
-        return aff.idealAssignment();
+        return aff.idealAssignmentRaw();
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index c6abe89..7115ac1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -169,7 +169,7 @@ public class CacheGroupAffinityMessage implements Message {
 
                 return new CacheGroupAffinityMessage(
                     assign,
-                    aff.centralizedAffinityFunction() ? aff.idealAssignment() 
: null,
+                    aff.centralizedAffinityFunction() ? 
aff.idealAssignmentRaw() : null,
                     null
                 );
             });
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 8e5caa7..ab17e85 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1375,7 +1375,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     GridAffinityAssignmentCache aff = grp.affinity();
 
-                    aff.initialize(initialVersion(), aff.idealAssignment());
+                    aff.initialize(initialVersion(), aff.idealAssignmentRaw());
 
                     cctx.exchange().exchangerUpdateHeartbeat();
                 }
@@ -4429,7 +4429,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             for (int i = 0; i < grp.affinity().partitions(); i++)
                 affAssignment.add(empty);
 
-            grp.affinity().idealAssignment(affAssignment);
+            grp.affinity().idealAssignment(initialVersion(), affAssignment);
 
             grp.affinity().initialize(initialVersion(), affAssignment);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index d43a623..026a54f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -583,7 +583,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                             assert !exchFut.context().mergeExchanges();
 
                             affVer = exchFut.initialVersion();
-                            affAssignment = grp.affinity().idealAssignment();
+                            affAssignment = 
grp.affinity().idealAssignmentRaw();
                         }
 
                         initPartitions(affVer, affAssignment, exchFut, 
updateSeq);

Reply via email to