This is an automated email from the ASF dual-hosted git repository. jokser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new afe7933 IGNITE-10799 Optimize affinity recalculation in case of node join or leave - Fixes #6242. afe7933 is described below commit afe7933b156d51691997fefd251b76de5ea15e1a Author: Pavel Kovalenko <jokse...@gmail.com> AuthorDate: Wed Apr 10 16:26:30 2019 +0300 IGNITE-10799 Optimize affinity recalculation in case of node join or leave - Fixes #6242. Signed-off-by: Pavel Kovalenko <jokse...@gmail.com> --- .../processors/affinity/AffinityAssignment.java | 5 + .../affinity/GridAffinityAssignment.java | 5 + .../affinity/GridAffinityAssignmentCache.java | 126 +++++-- .../affinity/GridAffinityAssignmentV2.java | 23 +- .../affinity/HistoryAffinityAssignmentImpl.java | 10 + .../HistoryAffinityAssignmentShallowCopy.java | 5 + .../affinity/IdealAffinityAssignment.java | 148 +++++++++ .../cache/CacheAffinitySharedManager.java | 362 +++++++++++++++++---- .../processors/cache/ExchangeDiscoveryEvents.java | 33 +- .../processors/cache/GridCacheAffinityManager.java | 2 +- .../dht/preloader/CacheGroupAffinityMessage.java | 2 +- .../preloader/GridDhtPartitionsExchangeFuture.java | 4 +- .../dht/topology/GridDhtPartitionTopologyImpl.java | 2 +- 13 files changed, 616 insertions(+), 111 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java index 62adaa7..b8b1089 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java @@ -100,6 +100,11 @@ public interface AffinityAssignment { public Set<Integer> backupPartitions(UUID nodeId); /** + * @return Set of partitions which primary is different to primary in ideal assignment. + */ + public Set<Integer> partitionPrimariesDifferentToIdeal(); + + /** * Converts List of Cluster Nodes to HashSet of UUIDs wrapped as unmodifiable collection. * @param assignmentPart Source assignment per partition. * @return List of deduplicated collections if ClusterNode's ids. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java index 0d68226..8feb39e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java @@ -239,6 +239,11 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable return set == null ? Collections.<Integer>emptySet() : set; } + /** {@inheritDoc} */ + @Override public Set<Integer> partitionPrimariesDifferentToIdeal() { + return Collections.emptySet(); + } + /** * Initializes primary and backup maps. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 0f468ac..95e26b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -96,13 +96,13 @@ public class GridAffinityAssignmentCache { private final ConcurrentNavigableMap<AffinityTopologyVersion, HistoryAffinityAssignment> affCache; /** */ - private List<List<ClusterNode>> idealAssignment; + private volatile IdealAffinityAssignment idealAssignment; /** */ - private BaselineTopology baselineTopology; + private volatile IdealAffinityAssignment baselineAssignment; /** */ - private List<List<ClusterNode>> baselineAssignment; + private BaselineTopology baselineTopology; /** Cache item corresponding to the head topology version. */ private final AtomicReference<GridAffinityAssignmentV2> head; @@ -208,7 +208,7 @@ public class GridAffinityAssignmentCache { assert idealAssignment != null; - GridAffinityAssignmentV2 assignment = new GridAffinityAssignmentV2(topVer, affAssignment, idealAssignment); + GridAffinityAssignmentV2 assignment = new GridAffinityAssignmentV2(topVer, affAssignment, idealAssignment.assignment()); HistoryAffinityAssignmentImpl newHistEntry = new HistoryAffinityAssignmentImpl(assignment, backups); @@ -238,14 +238,21 @@ public class GridAffinityAssignmentCache { /** * @param assignment Assignment. */ - public void idealAssignment(List<List<ClusterNode>> assignment) { - this.idealAssignment = assignment; + public void idealAssignment(AffinityTopologyVersion topVer, List<List<ClusterNode>> assignment) { + this.idealAssignment = IdealAffinityAssignment.create(topVer, assignment); } /** * @return Assignment. */ - @Nullable public List<List<ClusterNode>> idealAssignment() { + @Nullable public List<List<ClusterNode>> idealAssignmentRaw() { + return idealAssignment != null ? idealAssignment.assignment() : null; + } + + /** + * + */ + @Nullable public IdealAffinityAssignment idealAssignment() { return idealAssignment; } @@ -284,23 +291,27 @@ public class GridAffinityAssignmentCache { } /** - * Calculates affinity cache for given topology version. + * Calculates ideal assignment for given topology version and events happened since last calculation. * * @param topVer Topology version to calculate affinity cache for. * @param events Discovery events that caused this topology version change. * @param discoCache Discovery cache. - * @return Affinity assignments. + * @return Ideal affinity assignment. */ - public List<List<ClusterNode>> calculate( + public IdealAffinityAssignment calculate( AffinityTopologyVersion topVer, @Nullable ExchangeDiscoveryEvents events, @Nullable DiscoCache discoCache ) { if (log.isDebugEnabled()) - log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + + log.debug("Calculating ideal affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + ", discoEvts=" + events + ']'); - List<List<ClusterNode>> prevAssignment = idealAssignment; + IdealAffinityAssignment prevAssignment = idealAssignment; + + // Already calculated. + if (prevAssignment != null && prevAssignment.topologyVersion().equals(topVer)) + return prevAssignment; // Resolve nodes snapshot for specified topology version. List<ClusterNode> sorted; @@ -323,7 +334,7 @@ public class GridAffinityAssignmentCache { !discoCache.state().baselineTopology().equals(baselineTopology); } - List<List<ClusterNode>> assignment; + IdealAffinityAssignment assignment; if (prevAssignment != null && events != null) { /* Skip affinity calculation only when all nodes triggered exchange @@ -343,23 +354,45 @@ public class GridAffinityAssignmentCache { if (skipCalculation) assignment = prevAssignment; else if (hasBaseline && !changedBaseline) { - if (baselineAssignment == null) - baselineAssignment = aff.assignPartitions(new GridAffinityFunctionContextImpl( - discoCache.state().baselineTopology().createBaselineView(sorted, nodeFilter), - prevAssignment, events.lastEvent(), topVer, backups)); + if (baselineAssignment == null) { + List<ClusterNode> baselineAffinityNodes = discoCache.state().baselineTopology() + .createBaselineView(sorted, nodeFilter); + + List<List<ClusterNode>> calculated = aff.assignPartitions(new GridAffinityFunctionContextImpl( + baselineAffinityNodes, prevAssignment != null ? prevAssignment.assignment() : null, + events.lastEvent(), topVer, backups)); + + baselineAssignment = IdealAffinityAssignment.create(topVer, baselineAffinityNodes, calculated); + } - assignment = currentBaselineAssignment(topVer); + assignment = IdealAffinityAssignment.createWithPreservedPrimaries( + topVer, + baselineAssignmentWithoutOfflineNodes(topVer), + baselineAssignment + ); } else if (hasBaseline && changedBaseline) { - baselineAssignment = aff.assignPartitions(new GridAffinityFunctionContextImpl( - discoCache.state().baselineTopology().createBaselineView(sorted, nodeFilter), - prevAssignment, events.lastEvent(), topVer, backups)); + List<ClusterNode> baselineAffinityNodes = discoCache.state().baselineTopology() + .createBaselineView(sorted, nodeFilter); + + List<List<ClusterNode>> calculated = aff.assignPartitions(new GridAffinityFunctionContextImpl( + baselineAffinityNodes, prevAssignment != null ? prevAssignment.assignment() : null, + events.lastEvent(), topVer, backups)); - assignment = currentBaselineAssignment(topVer); + baselineAssignment = IdealAffinityAssignment.create(topVer, baselineAffinityNodes, calculated); + + assignment = IdealAffinityAssignment.createWithPreservedPrimaries( + topVer, + baselineAssignmentWithoutOfflineNodes(topVer), + baselineAssignment + ); } else { - assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, prevAssignment, + List<List<ClusterNode>> calculated = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, + prevAssignment != null ? prevAssignment.assignment() : null, events.lastEvent(), topVer, backups)); + + assignment = IdealAffinityAssignment.create(topVer, sorted, calculated); } } else { @@ -369,15 +402,27 @@ public class GridAffinityAssignmentCache { event = events.lastEvent(); if (hasBaseline) { - baselineAssignment = aff.assignPartitions(new GridAffinityFunctionContextImpl( - discoCache.state().baselineTopology().createBaselineView(sorted, nodeFilter), - prevAssignment, event, topVer, backups)); + List<ClusterNode> baselineAffinityNodes = discoCache.state().baselineTopology() + .createBaselineView(sorted, nodeFilter); + + List<List<ClusterNode>> calculated = aff.assignPartitions(new GridAffinityFunctionContextImpl( + baselineAffinityNodes, prevAssignment != null ? prevAssignment.assignment() : null, + event, topVer, backups)); + + baselineAssignment = IdealAffinityAssignment.create(topVer, baselineAffinityNodes, calculated); - assignment = currentBaselineAssignment(topVer); + assignment = IdealAffinityAssignment.createWithPreservedPrimaries( + topVer, + baselineAssignmentWithoutOfflineNodes(topVer), + baselineAssignment + ); } else { - assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, prevAssignment, + List<List<ClusterNode>> calculated = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, + prevAssignment != null ? prevAssignment.assignment() : null, event, topVer, backups)); + + assignment = IdealAffinityAssignment.create(topVer, sorted, calculated); } } @@ -386,7 +431,7 @@ public class GridAffinityAssignmentCache { idealAssignment = assignment; if (ctx.cache().cacheMode(cacheOrGrpName) == PARTITIONED && !ctx.clientNode()) - printDistributionIfThresholdExceeded(assignment, sorted.size()); + printDistributionIfThresholdExceeded(assignment.assignment(), sorted.size()); if (hasBaseline) { baselineTopology = discoCache.state().baselineTopology(); @@ -398,7 +443,7 @@ public class GridAffinityAssignmentCache { } if (locCache) - initialize(topVer, assignment); + initialize(topVer, assignment.assignment()); return assignment; } @@ -407,7 +452,7 @@ public class GridAffinityAssignmentCache { * @param topVer Topology version. * @return Baseline assignment with filtered out offline nodes. */ - private List<List<ClusterNode>> currentBaselineAssignment(AffinityTopologyVersion topVer) { + private List<List<ClusterNode>> baselineAssignmentWithoutOfflineNodes(AffinityTopologyVersion topVer) { Map<Object, ClusterNode> alives = new HashMap<>(); for (ClusterNode node : ctx.discovery().nodes(topVer)) { @@ -415,10 +460,12 @@ public class GridAffinityAssignmentCache { alives.put(node.consistentId(), node); } - List<List<ClusterNode>> result = new ArrayList<>(baselineAssignment.size()); + List<List<ClusterNode>> assignment = baselineAssignment.assignment(); - for (int p = 0; p < baselineAssignment.size(); p++) { - List<ClusterNode> baselineMapping = baselineAssignment.get(p); + List<List<ClusterNode>> result = new ArrayList<>(assignment.size()); + + for (int p = 0; p < assignment.size(); p++) { + List<ClusterNode> baselineMapping = assignment.get(p); List<ClusterNode> currentMapping = null; for (ClusterNode node : baselineMapping) { @@ -604,6 +651,13 @@ public class GridAffinityAssignmentCache { } /** + * @param topVer Topology version. + */ + public Set<Integer> partitionPrimariesDifferentToIdeal(AffinityTopologyVersion topVer) { + return cachedAffinity(topVer).partitionPrimariesDifferentToIdeal(); + } + + /** * Get primary partitions for specified node ID. * * @param nodeId Node ID to get primary partitions for. @@ -789,9 +843,9 @@ public class GridAffinityAssignmentCache { */ public void init(GridAffinityAssignmentCache aff) { assert aff.lastVersion().compareTo(lastVersion()) >= 0; - assert aff.idealAssignment() != null; + assert aff.idealAssignmentRaw() != null; - idealAssignment(aff.idealAssignment()); + idealAssignment(aff.lastVersion(), aff.idealAssignmentRaw()); AffinityAssignment assign = aff.cachedAffinity(aff.lastVersion()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2.java index 4a8f9a4..baee696 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2.java @@ -57,6 +57,9 @@ public class GridAffinityAssignmentV2 extends IgniteDataTransferObject implement /** Map of backup node partitions. */ private Map<UUID, Set<Integer>> backup; + /** Set of partitions which primary is different than in ideal assignment. */ + private Set<Integer> primariesDifferentToIdeal; + /** Assignment node IDs */ private transient volatile List<Collection<UUID>> assignmentIds; @@ -109,12 +112,15 @@ public class GridAffinityAssignmentV2 extends IgniteDataTransferObject implement // Temporary mirrors with modifiable partition's collections. Map<UUID, Set<Integer>> tmpPrimary = new HashMap<>(); Map<UUID, Set<Integer>> tmpBackup = new HashMap<>(); + Set<Integer> primariesDifferentToIdeal = new HashSet<>(); boolean isPrimary; for (int partsCnt = assignment.size(), p = 0; p < partsCnt; p++) { isPrimary = true; - for (ClusterNode node : assignment.get(p)) { + List<ClusterNode> currentOwners = assignment.get(p); + + for (ClusterNode node : currentOwners) { UUID id = node.id(); Map<UUID, Set<Integer>> tmp = isPrimary ? tmpPrimary : tmpBackup; @@ -130,10 +136,19 @@ public class GridAffinityAssignmentV2 extends IgniteDataTransferObject implement isPrimary = false; } + + List<ClusterNode> idealOwners = p < idealAssignment.size() ? idealAssignment.get(p) : Collections.emptyList(); + + ClusterNode curPrimary = !currentOwners.isEmpty() ? currentOwners.get(0) : null; + ClusterNode idealPrimary = !idealOwners.isEmpty() ? idealOwners.get(0) : null; + + if (curPrimary != null && !curPrimary.equals(idealPrimary)) + primariesDifferentToIdeal.add(p); } primary = Collections.unmodifiableMap(tmpPrimary); backup = Collections.unmodifiableMap(tmpBackup); + this.primariesDifferentToIdeal = Collections.unmodifiableSet(primariesDifferentToIdeal); } /** @@ -147,6 +162,7 @@ public class GridAffinityAssignmentV2 extends IgniteDataTransferObject implement idealAssignment = aff.idealAssignment; primary = aff.primary; backup = aff.backup; + primariesDifferentToIdeal = aff.primariesDifferentToIdeal; } /** @@ -289,6 +305,11 @@ public class GridAffinityAssignmentV2 extends IgniteDataTransferObject implement } /** {@inheritDoc} */ + @Override public Set<Integer> partitionPrimariesDifferentToIdeal() { + return Collections.unmodifiableSet(primariesDifferentToIdeal); + } + + /** {@inheritDoc} */ @Override public int hashCode() { return topVer.hashCode(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java index 1eda706..1123756 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java @@ -56,6 +56,9 @@ public class HistoryAffinityAssignmentImpl implements HistoryAffinityAssignment /** Diff with ideal. */ private final Map<Integer, char[]> assignmentDiff; + /** Partition primaries different to ideal. */ + private final Set<Integer> partitionPrimariesDifferentToIdeal; + /** * @param assign Assignment. * @param backups Backups. @@ -63,6 +66,8 @@ public class HistoryAffinityAssignmentImpl implements HistoryAffinityAssignment public HistoryAffinityAssignmentImpl(AffinityAssignment assign, int backups) { topVer = assign.topologyVersion(); + partitionPrimariesDifferentToIdeal = assign.partitionPrimariesDifferentToIdeal(); + if (IGNITE_DISABLE_AFFINITY_MEMORY_OPTIMIZATION || backups > IGNITE_AFFINITY_BACKUPS_THRESHOLD) { assignment = assign.assignment(); @@ -320,6 +325,11 @@ public class HistoryAffinityAssignmentImpl implements HistoryAffinityAssignment } /** {@inheritDoc} */ + @Override public Set<Integer> partitionPrimariesDifferentToIdeal() { + return Collections.unmodifiableSet(partitionPrimariesDifferentToIdeal); + } + + /** {@inheritDoc} */ @Override public boolean requiresHistoryCleanup() { return true; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java index 4fcea72..ac9bfa7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java @@ -96,6 +96,11 @@ public class HistoryAffinityAssignmentShallowCopy implements HistoryAffinityAssi } /** {@inheritDoc} */ + @Override public Set<Integer> partitionPrimariesDifferentToIdeal() { + return histAssignment.partitionPrimariesDifferentToIdeal(); + } + + /** {@inheritDoc} */ @Override public HistoryAffinityAssignment origin() { return histAssignment; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/IdealAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/IdealAffinityAssignment.java new file mode 100644 index 0000000..c930ec5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/IdealAffinityAssignment.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class IdealAffinityAssignment { + /** Topology version. */ + private final AffinityTopologyVersion topologyVersion; + + /** Assignment. */ + private final List<List<ClusterNode>> assignment; + + /** Ideal primaries. */ + private final Map<Object, Set<Integer>> idealPrimaries; + + /** + * @param topologyVersion Topology version. + * @param assignment Assignment. + * @param idealPrimaries Ideal primaries. + */ + private IdealAffinityAssignment( + AffinityTopologyVersion topologyVersion, + List<List<ClusterNode>> assignment, + Map<Object, Set<Integer>> idealPrimaries + ) { + this.topologyVersion = topologyVersion; + this.assignment = assignment; + this.idealPrimaries = idealPrimaries; + } + + /** + * @param clusterNode Cluster node. + */ + public Set<Integer> idealPrimaries(ClusterNode clusterNode) { + Object consistentId = clusterNode.consistentId(); + + assert consistentId != null : clusterNode; + + return idealPrimaries.getOrDefault(consistentId, Collections.emptySet()); + } + + /** + * @param partition Partition. + */ + public ClusterNode currentPrimary(int partition) { + return assignment.get(partition).get(0); + } + + /** + * + */ + public List<List<ClusterNode>> assignment() { + return assignment; + } + + /** + * + */ + public AffinityTopologyVersion topologyVersion() { + return topologyVersion; + } + + /** + * @param nodes Nodes. + * @param assignment Assignment. + */ + private static Map<Object, Set<Integer>> calculatePrimaries( + @Nullable List<ClusterNode> nodes, + List<List<ClusterNode>> assignment + ) { + int nodesSize = nodes != null ? nodes.size() : 100; + + Map<Object, Set<Integer>> primaryPartitions = U.newHashMap(nodesSize); + + for (int size = assignment.size(), p = 0; p < size; p++) { + List<ClusterNode> affinityNodes = assignment.get(p); + + if (!affinityNodes.isEmpty()) { + ClusterNode primary = affinityNodes.get(0); + + primaryPartitions.computeIfAbsent(primary.consistentId(), + id -> new HashSet<>(U.capacity(size / nodesSize * 2))).add(p); + } + } + + return primaryPartitions; + } + + /** + * @param topVer Topology version. + * @param assignment Assignment. + */ + public static IdealAffinityAssignment create(AffinityTopologyVersion topVer, List<List<ClusterNode>> assignment) { + return create(topVer, null, assignment); + } + + /** + * @param topVer Topology version. + * @param nodes Nodes. + * @param assignment Assignment. + */ + public static IdealAffinityAssignment create( + AffinityTopologyVersion topVer, + @Nullable List<ClusterNode> nodes, + List<List<ClusterNode>> assignment + ) { + return new IdealAffinityAssignment(topVer, assignment, calculatePrimaries(nodes, assignment)); + } + + /** + * @param topVer Topology version. + * @param assignment Assignment. + * @param previousAssignment Previous assignment. + */ + public static IdealAffinityAssignment createWithPreservedPrimaries( + AffinityTopologyVersion topVer, + List<List<ClusterNode>> assignment, + IdealAffinityAssignment previousAssignment + ) { + return new IdealAffinityAssignment(topVer, assignment, previousAssignment.idealPrimaries); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index d07012e..203ffb7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -50,6 +51,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; +import org.apache.ignite.internal.processors.affinity.IdealAffinityAssignment; import org.apache.ignite.internal.processors.cache.distributed.dht.ClientCacheDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture; @@ -469,7 +471,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap CacheGroupHolder grpHolder = grpHolders.get(grp.groupId()); - assert !crd || (grpHolder != null && grpHolder.affinity().idealAssignment() != null); + assert !crd || (grpHolder != null && grpHolder.affinity().idealAssignmentRaw() != null); if (grpHolder == null) grpHolder = getOrCreateGroupHolder(topVer, grpDesc); @@ -1097,7 +1099,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { - List<List<ClusterNode>> idealAssignment = aff.idealAssignment(); + List<List<ClusterNode>> idealAssignment = aff.idealAssignmentRaw(); assert idealAssignment != null; @@ -1114,7 +1116,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else newAssignment = idealAssignment; - aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache)); + aff.initialize(topVer, newAssignment); exchFut.timeBag().finishLocalStage("Affinity recalculate by change affinity message " + "[grp=" + aff.cacheOrGroupName() + "]"); @@ -1196,7 +1198,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assignment.set(part, nodes); } - aff.initialize(topVer, cachedAssignment(aff, assignment, affCache)); + aff.initialize(topVer, assignment); } else aff.clientEventTopologyChange(exchFut.firstEvent(), topVer); @@ -1340,7 +1342,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap CacheGroupContext grp = cctx.kernalContext().cache().cacheGroup(grpId); if (grpHolder != null && grpHolder.nonAffNode() && grp != null) { - assert grpHolder.affinity().idealAssignment() != null; + assert grpHolder.affinity().idealAssignmentRaw() != null; grpHolder = new CacheGroupAffNodeHolder(grp, grpHolder.affinity()); @@ -1471,15 +1473,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap // Please do not use following pattern of code (nodesByOrder, affCache). NEVER. final Map<Long, ClusterNode> nodesByOrder = new ConcurrentHashMap<>(); - final Map<Object, List<List<ClusterNode>>> affCache = new ConcurrentHashMap<>(); - forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { ExchangeDiscoveryEvents evts = fut.context().events(); Map<Integer, CacheGroupAffinityMessage> idealAffDiff = msg.idealAffinityDiff(); - List<List<ClusterNode>> idealAssignment = aff.calculate(evts.topologyVersion(), evts, evts.discoveryCache()); + List<List<ClusterNode>> idealAssignment = aff.calculate(evts.topologyVersion(), evts, evts.discoveryCache()).assignment(); CacheGroupAffinityMessage affMsg = idealAffDiff != null ? idealAffDiff.get(aff.groupId()) : null; @@ -1503,7 +1503,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else newAssignment = idealAssignment; - aff.initialize(evts.topologyVersion(), cachedAssignment(aff, newAssignment, affCache)); + aff.initialize(evts.topologyVersion(), newAssignment); fut.timeBag().finishLocalStage("Affinity applying from full message " + "[grp=" + aff.cacheOrGroupName() + "]"); @@ -1559,7 +1559,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap affMsg.createIdealAssignments(nodesByOrder, evts.discoveryCache()); if (idealAssign != null) - aff.idealAssignment(idealAssign); + aff.idealAssignment(evts.topologyVersion(), idealAssign); else { assert !aff.centralizedAffinityFunction() : aff; @@ -1620,9 +1620,238 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert fut.context().mergeExchanges(); assert evts.hasServerLeft(); - Map<Integer, CacheGroupAffinityMessage> result = onReassignmentEnforced(fut); + if (evts.hasServerLeft() && evts.hasServerJoin()) + return onReassignmentEnforced(fut); + else + return onServerLeftWithExchangeMergeProtocolLightweight(fut); + } - return result; + /** + * @param fut Current exchange future. + * @return Computed difference with ideal affinity. + * @throws IgniteCheckedException If failed. + */ + private Map<Integer, CacheGroupAffinityMessage> onServerLeftWithExchangeMergeProtocolLightweight( + final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException + { + final ExchangeDiscoveryEvents evts = fut.context().events(); + final AffinityTopologyVersion topVer = evts.topologyVersion(); + + assert fut.context().mergeExchanges(); + assert evts.hasServerLeft(); + + final WaitRebalanceInfo waitRebalanceInfo = + new WaitRebalanceInfo(fut.context().events().lastServerEventVersion()); + + final Map<Integer, Map<Integer, List<Long>>> diff = new ConcurrentHashMap<>(); + + final Set<ClusterNode> aliveNodes = new HashSet<>(fut.context().events().discoveryCache().serverNodes()); + + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { + AffinityTopologyVersion topVer = evts.topologyVersion(); + + CacheGroupHolder grpHolder = getOrCreateGroupHolder(topVer, desc); + + IdealAffinityAssignment idealAssignment = grpHolder.affinity().calculate(topVer, evts, evts.discoveryCache()); + + if (!grpHolder.rebalanceEnabled || fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom())) { + grpHolder.affinity().initialize(topVer, idealAssignment.assignment()); + + return; + } + + AffinityTopologyVersion affTopVer = grpHolder.affinity().lastVersion(); + + List<List<ClusterNode>> curAssignment = grpHolder.affinity().assignments(affTopVer); + + assert curAssignment != null; + + List<List<ClusterNode>> newAssignment = new ArrayList<>(idealAssignment.assignment()); + + GridDhtPartitionTopology top = grpHolder.topology(fut.context().events().discoveryCache()); + + BitSet processedPartitions = new BitSet(curAssignment.size()); + + Map<Integer, List<Long>> cacheAffinityDiff = new HashMap<>(); + + for (ClusterNode leftNode : evts.leftServerNodes()) { + for (int p : idealAssignment.idealPrimaries(leftNode)) { + List<ClusterNode> curOwners = curAssignment.get(p); + + if (curOwners.isEmpty()) + continue; + + List<ClusterNode> idealOwners = idealAssignment.assignment().get(p); + + List<ClusterNode> newOwners = null; + + if (idealOwners.isEmpty()) + newOwners = selectCurrentAliveOwners(aliveNodes, curOwners); + else if (curOwners.get(0).equals(leftNode)) + newOwners = selectPrimaryTopologyOwnerFromIdealAssignment( + grpHolder.aff, + p, + top, + idealOwners, + waitRebalanceInfo + ); + else if (!curOwners.get(0).equals(idealOwners.get(0))) + newOwners = latePrimaryAssignment( + grpHolder.aff, + p, + curOwners.get(0), + idealOwners, + waitRebalanceInfo + ); + + if (newOwners != null) { + newAssignment.set(p, newOwners); + + List<Long> clusterNodesAsOrder = newOwners.stream() + .map(NODE_TO_ORDER::apply) + .collect(Collectors.toList()); + + cacheAffinityDiff.put(p, clusterNodesAsOrder); + + processedPartitions.set(p); + } + } + } + + Set<Integer> partitionsWithChangedPrimary = grpHolder.affinity().partitionPrimariesDifferentToIdeal(affTopVer); + + // We need to re-check partitions for further correct late affinity assignment + // where primary node is not as in ideal assignment. + for (int p : partitionsWithChangedPrimary) { + if (processedPartitions.get(p)) + continue; + + List<ClusterNode> curOwners = curAssignment.get(p); + + if (curOwners.isEmpty()) + continue; + + List<ClusterNode> idealOwners = idealAssignment.assignment().get(p); + + List<ClusterNode> newOwners = null; + + if (idealOwners.isEmpty()) + newOwners = selectCurrentAliveOwners(aliveNodes, curOwners); + else if (!aliveNodes.contains(curOwners.get(0))) + newOwners = selectPrimaryTopologyOwnerFromIdealAssignment( + grpHolder.aff, + p, + top, + idealOwners, + waitRebalanceInfo + ); + else if (!curOwners.get(0).equals(idealOwners.get(0))) + // Current distribution was already not ideal. Preserve it for late affinity assignment. + newOwners = latePrimaryAssignment( + grpHolder.aff, + p, + curOwners.get(0), + idealOwners, + waitRebalanceInfo + ); + + if (newOwners != null) { + newAssignment.set(p, newOwners); + + List<Long> clusterNodesAsOrder = newOwners.stream() + .map(NODE_TO_ORDER::apply) + .collect(Collectors.toList()); + + cacheAffinityDiff.put(p, clusterNodesAsOrder); + } + } + + if (!cacheAffinityDiff.isEmpty()) + diff.put(grpHolder.groupId(), cacheAffinityDiff); + + grpHolder.affinity().initialize(topVer, newAssignment); + + fut.timeBag().finishLocalStage("Affinity initialization (on server left) " + + "[grp=" + desc.cacheOrGroupName() + "]"); + } + }); + + synchronized (mux) { + this.waitInfo = !waitRebalanceInfo.empty() ? waitRebalanceInfo : null; + + WaitRebalanceInfo info = this.waitInfo; + + if (log.isDebugEnabled()) { + log.debug("Computed new affinity after node left [topVer=" + topVer + + ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']'); + } + } + + return CacheGroupAffinityMessage.createAffinityDiffMessages(diff); + } + + /** + * Selects current alive owners for some partition as affinity distribution. + * + * @param aliveNodes Alive cluster nodes. + * @param curOwners Current affinity owners for some partition. + * + * @return List of current alive affinity owners. + * {@code null} if affinity owners should be inherited from ideal assignment as is. + */ + private @Nullable List<ClusterNode> selectCurrentAliveOwners( + Set<ClusterNode> aliveNodes, + List<ClusterNode> curOwners + ) { + List<ClusterNode> aliveCurOwners = curOwners.stream().filter(aliveNodes::contains).collect(Collectors.toList()); + + return !aliveCurOwners.isEmpty() ? aliveCurOwners : null; + } + + /** + * Selects a node from ideal assignment that holds {@code OWNING} status for given partition as affinity primary. + * Other nodes from ideal assignment are selected as backups. + * + * @param aff Affinity assignment cache. + * @param partition Partition number. + * @param topology Partition topology for cache. + * @param idealOwners Ideal affinity distribution for given partition. + * @param waitRebalanceInfo Wait rebalance info for late affinity assignment. + + * @return List of affinity owners where first node is primary and holds {@code OWNING} partition status. + * {@code null} if affinity owners should be inherited from ideal assignment as is. + */ + private @Nullable List<ClusterNode> selectPrimaryTopologyOwnerFromIdealAssignment( + GridAffinityAssignmentCache aff, + int partition, + GridDhtPartitionTopology topology, + List<ClusterNode> idealOwners, + WaitRebalanceInfo waitRebalanceInfo + ) { + ClusterNode newPrimary = idealOwners.get(0); + + if (topology.partitionState(newPrimary.id(), partition) != GridDhtPartitionState.OWNING) { + for (ClusterNode node : idealOwners) { + if (topology.partitionState(node.id(), partition) == GridDhtPartitionState.OWNING) { + newPrimary = node; + + break; + } + } + } + + // If primary by ideal assignment is already topology owner, no need to change affinity for that partition. + if (newPrimary.equals(idealOwners.get(0))) + return null; + + // In other case re-select primary with late affinity assignment. + return latePrimaryAssignment( + aff, + partition, + newPrimary, + idealOwners, + waitRebalanceInfo); } /** @@ -1660,7 +1889,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap CacheGroupHolder cache = getOrCreateGroupHolder(topVer, desc); - List<List<ClusterNode>> assign = cache.affinity().calculate(topVer, evts, evts.discoveryCache()); + List<List<ClusterNode>> assign = cache.affinity().calculate(topVer, evts, evts.discoveryCache()).assignment(); if (!cache.rebalanceEnabled || fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom())) cache.affinity().initialize(topVer, assign); @@ -1795,7 +2024,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap GridAffinityAssignmentCache aff, AffinityTopologyVersion topVer) { - List<List<ClusterNode>> assignment = aff.calculate(topVer, evts, evts.discoveryCache()); + List<List<ClusterNode>> assignment = aff.calculate(topVer, evts, evts.discoveryCache()).assignment(); aff.initialize(topVer, assignment); } @@ -1880,7 +2109,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap GridDhtAffinityAssignmentResponse res = fetchFut.get(); if (res == null) { - List<List<ClusterNode>> aff = affCache.calculate(topVer, events, discoCache); + List<List<ClusterNode>> aff = affCache.calculate(topVer, events, discoCache).assignment(); affCache.initialize(topVer, aff); } @@ -1888,7 +2117,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(discoCache); if (idealAff != null) - affCache.idealAssignment(idealAff); + affCache.idealAssignment(topVer, idealAff); else { assert !affCache.centralizedAffinityFunction(); @@ -1960,7 +2189,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { CacheGroupHolder grpHolder = getOrCreateGroupHolder(topVer, desc); - if (grpHolder.affinity().idealAssignment() != null) + if (grpHolder.affinity().idealAssignmentRaw() != null) return; // Need initialize holders and affinity if this node became coordinator during this exchange. @@ -2139,8 +2368,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd) { final ExchangeDiscoveryEvents evts = fut.context().events(); - final Map<Object, List<List<ClusterNode>>> affCache = new ConcurrentHashMap<>(); - final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(evts.lastServerEventVersion()); forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { @@ -2158,8 +2385,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap grpAdded, cache.affinity(), crd ? waitRebalanceInfo : null, - latePrimary, - affCache); + latePrimary); if (crd && grpAdded) { AffinityAssignment aff = cache.aff.cachedAffinity(cache.aff.lastVersion()); @@ -2220,15 +2446,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param aff Affinity. * @param rebalanceInfo Rebalance information. * @param latePrimary If {@code true} delays primary assignment if it is not owner. - * @param affCache Already calculated assignments (to reduce data stored in history). */ private void initAffinityOnNodeJoin( ExchangeDiscoveryEvents evts, boolean addedOnExchnage, GridAffinityAssignmentCache aff, WaitRebalanceInfo rebalanceInfo, - boolean latePrimary, - Map<Object, List<List<ClusterNode>>> affCache + boolean latePrimary ) { if (addedOnExchnage) { if (!aff.lastVersion().equals(evts.topologyVersion())) @@ -2242,61 +2466,79 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() + ", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']'; - List<List<ClusterNode>> curAff = aff.assignments(affTopVer); + assert aff.idealAssignmentRaw() != null : "Previous assignment is not available."; - assert aff.idealAssignment() != null : "Previous assignment is not available."; - - List<List<ClusterNode>> idealAssignment = aff.calculate(evts.topologyVersion(), evts, evts.discoveryCache()); + IdealAffinityAssignment idealAssignment = aff.calculate(evts.topologyVersion(), evts, evts.discoveryCache()); + List<List<ClusterNode>> curAssignment = aff.assignments(affTopVer); List<List<ClusterNode>> newAssignment = null; if (latePrimary) { - for (int p = 0; p < idealAssignment.size(); p++) { - List<ClusterNode> newNodes = idealAssignment.get(p); - List<ClusterNode> curNodes = curAff.get(p); + BitSet processedPartitions = new BitSet(curAssignment.size()); + + // Late affinity assignment to changed primaries. + for (ClusterNode joinedNode : evts.joinedServerNodes()) { + Set<Integer> primaries = idealAssignment.idealPrimaries(joinedNode); - ClusterNode curPrimary = !curNodes.isEmpty() ? curNodes.get(0) : null; - ClusterNode newPrimary = !newNodes.isEmpty() ? newNodes.get(0) : null; + for (int p : primaries) { + List<ClusterNode> curNodes = curAssignment.get(p); + + if (curNodes.isEmpty()) + continue; + + ClusterNode curPrimary = curNodes.get(0); - if (curPrimary != null && newPrimary != null && !curPrimary.equals(newPrimary)) { assert cctx.discovery().node(evts.topologyVersion(), curPrimary.id()) != null : curPrimary; - List<ClusterNode> nodes0 = latePrimaryAssignment(aff, + List<ClusterNode> idealNodes = idealAssignment.assignment().get(p); + + List<ClusterNode> newNodes = latePrimaryAssignment(aff, p, curPrimary, - newNodes, + idealNodes, rebalanceInfo); if (newAssignment == null) - newAssignment = new ArrayList<>(idealAssignment); + newAssignment = new ArrayList<>(idealAssignment.assignment()); + + newAssignment.set(p, newNodes); - newAssignment.set(p, nodes0); + processedPartitions.set(p); } } - } - if (newAssignment == null) - newAssignment = idealAssignment; + Set<Integer> partitionsWithChangedPrimary = aff.partitionPrimariesDifferentToIdeal(affTopVer); - aff.initialize(evts.topologyVersion(), cachedAssignment(aff, newAssignment, affCache)); - } + for (int p : partitionsWithChangedPrimary) { + // Already processed above. + if (processedPartitions.get(p)) + continue; - /** - * @param aff Assignment cache. - * @param assign Assignment. - * @param affCache Assignments already calculated for other caches. - * @return Assignment. - */ - private List<List<ClusterNode>> cachedAssignment(GridAffinityAssignmentCache aff, - List<List<ClusterNode>> assign, - Map<Object, List<List<ClusterNode>>> affCache) { - List<List<ClusterNode>> assign0 = affCache.get(aff.similarAffinityKey()); + List<ClusterNode> curNodes = curAssignment.get(p); - if (assign0 != null && assign0.equals(assign)) - assign = assign0; - else - affCache.put(aff.similarAffinityKey(), assign); + if (curNodes.isEmpty()) + continue; + + List<ClusterNode> idealOwners = idealAssignment.assignment().get(p); + + if (!curNodes.get(0).equals(idealOwners.get(0))) { + List<ClusterNode> newNodes = latePrimaryAssignment(aff, + p, + curNodes.get(0), + idealOwners, + rebalanceInfo); + + if (newAssignment == null) + newAssignment = new ArrayList<>(idealAssignment.assignment()); + + newAssignment.set(p, newNodes); + } + } + } + + if (newAssignment == null) + newAssignment = idealAssignment.assignment(); - return assign; + aff.initialize(evts.topologyVersion(), newAssignment); } /** @@ -2407,7 +2649,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap "Invalid affinity version [last=" + affTopVer + ", futVer=" + topVer + ", grp=" + desc.cacheOrGroupName() + ']'; List<List<ClusterNode>> curAssignment = grpHolder.affinity().assignments(affTopVer); - List<List<ClusterNode>> newAssignment = grpHolder.affinity().idealAssignment(); + List<List<ClusterNode>> newAssignment = grpHolder.affinity().idealAssignmentRaw(); assert newAssignment != null; @@ -2436,7 +2678,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ", node=" + newPrimary + ", topVer=" + topVer + ']'; - List<ClusterNode> owners = top.owners(p); + List<ClusterNode> owners = top.owners(p, topVer); // It is essential that curPrimary node has partition in OWNING state. if (!owners.isEmpty() && !owners.contains(curPrimary)) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java index 23df8d4..b71abbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.UUID; import org.apache.ignite.IgniteLogger; @@ -57,13 +58,13 @@ public class ExchangeDiscoveryEvents { private DiscoveryEvent lastSrvEvt; /** All events. */ - private List<DiscoveryEvent> evts = new ArrayList<>(); + private List<DiscoveryEvent> evts = Collections.synchronizedList(new ArrayList<>()); - /** Server join flag. */ - private boolean srvJoin; + /** Joined server nodes. */ + private List<ClusterNode> joinedSrvNodes = Collections.synchronizedList(new ArrayList<>()); - /** Sever left flag. */ - private boolean srvLeft; + /** Left server nodes. */ + private List<ClusterNode> leftSrvNodes = Collections.synchronizedList(new ArrayList<>()); /** * @param fut Current exchange future. @@ -126,9 +127,9 @@ public class ExchangeDiscoveryEvents { srvEvtTopVer = new AffinityTopologyVersion(evt.topologyVersion(), 0); if (evt.type()== EVT_NODE_JOINED) - srvJoin = true; + joinedSrvNodes.add(evt.eventNode()); else if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) - srvLeft = !node.isClient(); + leftSrvNodes.add(evt.eventNode()); } } @@ -180,14 +181,28 @@ public class ExchangeDiscoveryEvents { * @return {@code True} if has event for server join. */ public boolean hasServerJoin() { - return srvJoin; + return !joinedSrvNodes.isEmpty(); } /** * @return {@code True} if has event for server leave. */ public boolean hasServerLeft() { - return srvLeft; + return !leftSrvNodes.isEmpty(); + } + + /** + * + */ + public List<ClusterNode> joinedServerNodes() { + return joinedSrvNodes; + } + + /** + * + */ + public List<ClusterNode> leftServerNodes() { + return leftSrvNodes; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 1315c67..855923f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -137,7 +137,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { public List<List<ClusterNode>> idealAssignment() { assert !cctx.isLocal(); - return aff.idealAssignment(); + return aff.idealAssignmentRaw(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java index c6abe89..7115ac1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java @@ -169,7 +169,7 @@ public class CacheGroupAffinityMessage implements Message { return new CacheGroupAffinityMessage( assign, - aff.centralizedAffinityFunction() ? aff.idealAssignment() : null, + aff.centralizedAffinityFunction() ? aff.idealAssignmentRaw() : null, null ); }); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 8e5caa7..ab17e85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1375,7 +1375,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte for (CacheGroupContext grp : cctx.cache().cacheGroups()) { GridAffinityAssignmentCache aff = grp.affinity(); - aff.initialize(initialVersion(), aff.idealAssignment()); + aff.initialize(initialVersion(), aff.idealAssignmentRaw()); cctx.exchange().exchangerUpdateHeartbeat(); } @@ -4429,7 +4429,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte for (int i = 0; i < grp.affinity().partitions(); i++) affAssignment.add(empty); - grp.affinity().idealAssignment(affAssignment); + grp.affinity().idealAssignment(initialVersion(), affAssignment); grp.affinity().initialize(initialVersion(), affAssignment); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java index d43a623..026a54f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java @@ -583,7 +583,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert !exchFut.context().mergeExchanges(); affVer = exchFut.initialVersion(); - affAssignment = grp.affinity().idealAssignment(); + affAssignment = grp.affinity().idealAssignmentRaw(); } initPartitions(affVer, affAssignment, exchFut, updateSeq);