ignite-3477-master - drop FairAffinityFunction
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9b64246 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9b64246 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9b64246 Branch: refs/heads/ignite-3477-master Commit: f9b64246f4bce407cb52853bdf296270ece69036 Parents: 01b088f Author: Sergi Vladykin <sergi.vlady...@gmail.com> Authored: Tue Apr 11 12:42:19 2017 +0300 Committer: Sergi Vladykin <sergi.vlady...@gmail.com> Committed: Tue Apr 11 12:42:19 2017 +0300 ---------------------------------------------------------------------- .../affinity/fair/FairAffinityFunction.java | 1159 ------------------ .../cache/affinity/fair/package-info.java | 21 - .../utils/PlatformConfigurationUtils.java | 40 +- .../GridCacheAffinityBackupsSelfTest.java | 20 +- .../ignite/IgniteCacheAffinitySelfTest.java | 12 +- .../affinity/AffinityClientNodeSelfTest.java | 15 +- .../affinity/AffinityHistoryCleanupTest.java | 3 +- .../fair/FairAffinityDynamicCacheSelfTest.java | 86 -- ...airAffinityFunctionBackupFilterSelfTest.java | 44 - ...ffinityFunctionExcludeNeighborsSelfTest.java | 31 - .../fair/FairAffinityFunctionNodesSelfTest.java | 247 ---- .../fair/FairAffinityFunctionSelfTest.java | 31 - .../affinity/fair/FairAffinityNodesRestart.java | 130 -- .../local/LocalAffinityFunctionTest.java | 4 +- ...CacheExchangeMessageDuplicatedStateTest.java | 33 - .../cache/CrossCacheTxRandomOperationsTest.java | 36 +- .../GridCacheVersionTopologyChangeTest.java | 5 - .../IgniteClientAffinityAssignmentSelfTest.java | 20 +- ...eDynamicCacheStartNoExchangeTimeoutTest.java | 3 - ...eLateAffinityAssignmentFairAffinityTest.java | 32 - ...teCacheClientNodePartitionsExchangeTest.java | 18 +- .../IgniteCacheTxFairAffinityNodeJoinTest.java | 35 - ...arDisabledFairAffinityPutGetRestartTest.java | 35 - ...ledFairAffinityMultiNodeFullApiSelfTest.java | 36 - .../AtomicPutAllChangingTopologyTest.java | 4 +- ...nlyFairAffinityMultiNodeFullApiSelfTest.java | 36 - ...micFairAffinityMultiNodeFullApiSelfTest.java | 35 - ...ledFairAffinityMultiNodeFullApiSelfTest.java | 36 - ...derFairAffinityMultiNodeFullApiSelfTest.java | 36 - ...nlyFairAffinityMultiNodeFullApiSelfTest.java | 35 - ...xcludeNeighborsMultiNodeFullApiSelfTest.java | 36 - ...tedFairAffinityMultiNodeFullApiSelfTest.java | 35 - ...nedFairAffinityMultiNodeFullApiSelfTest.java | 37 - ...OnlyFairAffinityMultiJvmFullApiSelfTest.java | 31 - ...omicFairAffinityMultiJvmFullApiSelfTest.java | 31 - ...bledFairAffinityMultiJvmFullApiSelfTest.java | 36 - ...rderFairAffinityMultiJvmFullApiSelfTest.java | 31 - ...OnlyFairAffinityMultiJvmFullApiSelfTest.java | 31 - ...onedFairAffinityMultiJvmFullApiSelfTest.java | 31 - ...bledFairAffinityMultiJvmFullApiSelfTest.java | 31 - .../configvariations/ConfigVariations.java | 2 - .../IgniteCacheFailoverTestSuite.java | 4 - ...IgniteCacheFullApiMultiJvmSelfTestSuite.java | 15 - .../IgniteCacheFullApiSelfTestSuite.java | 24 +- .../ignite/testsuites/IgniteCacheTestSuite.java | 9 +- .../testsuites/IgniteCacheTestSuite2.java | 4 - .../testsuites/IgniteCacheTestSuite5.java | 10 - 47 files changed, 37 insertions(+), 2639 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f9b64246/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java deleted file mode 100644 index fb19fb5..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java +++ /dev/null @@ -1,1159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity.fair; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.RandomAccess; -import java.util.UUID; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.affinity.AffinityCentralizedFunction; -import org.apache.ignite.cache.affinity.AffinityFunction; -import org.apache.ignite.cache.affinity.AffinityFunctionContext; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.processors.cache.GridCacheUtils; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.A; -import org.apache.ignite.internal.util.typedef.internal.LT; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiPredicate; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.resources.LoggerResource; -import org.jetbrains.annotations.Nullable; - -/** - * Fair affinity function which tries to ensure that all nodes get equal number of partitions with - * minimum amount of reassignments between existing nodes. - * This function supports the following configuration: - * <ul> - * <li> - * {@code partitions} - Number of partitions to spread across nodes. - * </li> - * <li> - * {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors - * from being backups of each other. This flag can be ignored in cases when topology has no enough nodes - * for assign backups. - * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. - * </li> - * <li> - * {@code backupFilter} - Optional filter for back up nodes. If provided, then only - * nodes that pass this filter will be selected as backup nodes. If not provided, then - * primary and backup nodes will be selected out of all nodes available for this cache. - * </li> - * </ul> - * <p> - * Cache affinity can be configured for individual caches via {@link CacheConfiguration#getAffinity()} method. - */ -@AffinityCentralizedFunction -public class FairAffinityFunction implements AffinityFunction { - /** Default partition count. */ - public static final int DFLT_PART_CNT = 256; - - /** */ - private static final long serialVersionUID = 0L; - - /** Ascending comparator. */ - private static final Comparator<PartitionSet> ASC_CMP = new PartitionSetComparator(); - - /** Descending comparator. */ - private static final Comparator<PartitionSet> DESC_CMP = Collections.reverseOrder(ASC_CMP); - - /** Number of partitions. */ - private int parts; - - /** Exclude neighbors flag. */ - private boolean exclNeighbors; - - /** Exclude neighbors warning. */ - private transient boolean exclNeighborsWarn; - - /** Logger instance. */ - @LoggerResource - private transient IgniteLogger log; - - /** Optional backup filter. First node is primary, second node is a node being tested. */ - private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; - - /** Optional affinity backups filter. The first node is a node being tested, the second is a list of nodes that are already assigned for a given partition (primary node is the first in the list). */ - private IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter; - - /** - * Empty constructor with all defaults. - */ - public FairAffinityFunction() { - this(false); - } - - /** - * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other - * and specified number of backups. - * <p> - * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups - * of each other. - */ - public FairAffinityFunction(boolean exclNeighbors) { - this(exclNeighbors, DFLT_PART_CNT); - } - - /** - * @param parts Number of partitions. - */ - public FairAffinityFunction(int parts) { - this(false, parts); - } - - /** - * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other, - * and specified number of backups and partitions. - * <p> - * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups - * of each other. - * @param parts Total number of partitions. - */ - public FairAffinityFunction(boolean exclNeighbors, int parts) { - this(exclNeighbors, parts, null); - } - - /** - * Initializes optional counts for replicas and backups. - * <p> - * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. - * - * @param parts Total number of partitions. - * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected - * from all nodes that pass this filter. First argument for this filter is primary node, and second - * argument is node being tested. - */ - public FairAffinityFunction(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { - this(false, parts, backupFilter); - } - - /** - * Private constructor. - * - * @param exclNeighbors Exclude neighbors flag. - * @param parts Partitions count. - * @param backupFilter Backup filter. - */ - private FairAffinityFunction(boolean exclNeighbors, int parts, - IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { - A.ensure(parts > 0, "parts > 0"); - A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <=" + CacheConfiguration.MAX_PARTITIONS_COUNT); - - this.exclNeighbors = exclNeighbors; - this.parts = parts; - this.backupFilter = backupFilter; - } - - /** - * Gets total number of key partitions. To ensure that all partitions are - * equally distributed across all nodes, please make sure that this - * number is significantly larger than a number of nodes. Also, partition - * size should be relatively small. Try to avoid having partitions with more - * than quarter million keys. - * <p> - * Note that for fully replicated caches this method should always - * return {@code 1}. - * - * @return Total partition count. - */ - public int getPartitions() { - return parts; - } - - /** - * Sets total number of partitions. - * - * @param parts Total number of partitions. - * @return {@code this} for chaining. - */ - public FairAffinityFunction setPartitions(int parts) { - A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT); - - this.parts = parts; - - return this; - } - - - /** - * Gets optional backup filter. If not {@code null}, backups will be selected - * from all nodes that pass this filter. First node passed to this filter is primary node, - * and second node is a node being tested. - * <p> - * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. - * - * @return Optional backup filter. - */ - @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() { - return backupFilter; - } - - /** - * Sets optional backup filter. If provided, then backups will be selected from all - * nodes that pass this filter. First node being passed to this filter is primary node, - * and second node is a node being tested. - * <p> - * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. - * - * @param backupFilter Optional backup filter. - * @deprecated Use {@code affinityBackupFilter} instead. - * @return {@code this} for chaining. - */ - @Deprecated - public FairAffinityFunction setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { - this.backupFilter = backupFilter; - - return this; - } - - /** - * Gets optional backup filter. If not {@code null}, backups will be selected - * from all nodes that pass this filter. First node passed to this filter is a node being tested, - * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is the first in the list). - * <p> - * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. - * - * @return Optional backup filter. - */ - @Nullable public IgniteBiPredicate<ClusterNode, List<ClusterNode>> getAffinityBackupFilter() { - return affinityBackupFilter; - } - - /** - * Sets optional backup filter. If provided, then backups will be selected from all - * nodes that pass this filter. First node being passed to this filter is a node being tested, - * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is the first in the list). - * <p> - * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. - * - * @param affinityBackupFilter Optional backup filter. - * @return {@code this} for chaining. - */ - public FairAffinityFunction setAffinityBackupFilter( - @Nullable IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter) { - this.affinityBackupFilter = affinityBackupFilter; - - return this; - } - - /** - * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). - * <p> - * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. - * - * @return {@code True} if nodes residing on the same host may not act as backups of each other. - */ - public boolean isExcludeNeighbors() { - return exclNeighbors; - } - - /** - * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). - * <p> - * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other. - * @return {@code this} for chaining. - */ - public FairAffinityFunction setExcludeNeighbors(boolean exclNeighbors) { - this.exclNeighbors = exclNeighbors; - - return this; - } - - /** {@inheritDoc} */ - @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext ctx) { - List<ClusterNode> topSnapshot = ctx.currentTopologySnapshot(); - - if (topSnapshot.size() == 1) { - ClusterNode primary = topSnapshot.get(0); - - return Collections.nCopies(parts, Collections.singletonList(primary)); - } - - Map<UUID, Collection<ClusterNode>> neighborhoodMap = exclNeighbors - ? GridCacheUtils.neighbors(ctx.currentTopologySnapshot()) - : null; - - List<List<ClusterNode>> assignment = createCopy(ctx, neighborhoodMap); - - int backups = ctx.backups(); - - int tiers = backups == Integer.MAX_VALUE ? topSnapshot.size() : Math.min(backups + 1, topSnapshot.size()); - - // Per tier pending partitions. - Map<Integer, Queue<Integer>> pendingParts = new HashMap<>(); - - FullAssignmentMap fullMap = new FullAssignmentMap(tiers, assignment, topSnapshot, neighborhoodMap); - - for (int tier = 0; tier < tiers; tier++) { - // Check if this is a new tier and add pending partitions. - Queue<Integer> pending = pendingParts.get(tier); - - for (int part = 0; part < parts; part++) { - if (fullMap.assignments.get(part).size() < tier + 1) { - if (pending == null) - pendingParts.put(tier, pending = new LinkedList<>()); - - if (!pending.contains(part)) - pending.add(part); - } - } - - // Assign pending partitions, if any. - assignPending(tier, pendingParts, fullMap, topSnapshot, false); - - // Balance assignments. - boolean balanced = balance(tier, pendingParts, fullMap, topSnapshot, false); - - if (!balanced && exclNeighbors) { - assignPending(tier, pendingParts, fullMap, topSnapshot, true); - - balance(tier, pendingParts, fullMap, topSnapshot, true); - - if (!exclNeighborsWarn) { - LT.warn(log, "Affinity function excludeNeighbors property is ignored " + - "because topology has no enough nodes to assign backups."); - - exclNeighborsWarn = true; - } - } - } - - return fullMap.assignments; - } - - /** {@inheritDoc} */ - @Override public void reset() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int partitions() { - return parts; - } - - /** {@inheritDoc} */ - @Override public int partition(Object key) { - if (key == null) - throw new IllegalArgumentException("Null key is passed for a partition calculation. " + - "Make sure that an affinity key that is used is initialized properly."); - - return U.safeAbs(hash(key.hashCode())) % parts; - } - - /** {@inheritDoc} */ - @Override public void removeNode(UUID nodeId) { - // No-op. - } - - /** - * Assigns pending (unassigned) partitions to nodes. - * - * @param tier Tier to assign (0 is primary, 1 - 1st backup,...). - * @param pendingMap Pending partitions per tier. - * @param fullMap Full assignment map to modify. - * @param topSnapshot Topology snapshot. - * @param allowNeighbors Allow neighbors nodes for partition. - */ - private void assignPending(int tier, - Map<Integer, Queue<Integer>> pendingMap, - FullAssignmentMap fullMap, - List<ClusterNode> topSnapshot, - boolean allowNeighbors) - { - Queue<Integer> pending = pendingMap.get(tier); - - if (F.isEmpty(pending)) - return; - - int idealPartCnt = parts / topSnapshot.size(); - - Map<UUID, PartitionSet> tierMapping = fullMap.tierMapping(tier); - - PrioritizedPartitionMap underloadedNodes = filterNodes(tierMapping, idealPartCnt, false); - - // First iterate over underloaded nodes. - assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, false, allowNeighbors); - - if (!pending.isEmpty() && !underloadedNodes.isEmpty()) { - // Same, forcing updates. - assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, true, allowNeighbors); - } - - if (!pending.isEmpty()) - assignPendingToNodes(tier, pendingMap, fullMap, topSnapshot, allowNeighbors); - - if (pending.isEmpty()) - pendingMap.remove(tier); - } - - /** - * Assigns pending partitions to underloaded nodes. - * - * @param tier Tier to assign. - * @param pendingMap Pending partitions per tier. - * @param fullMap Full assignment map to modify. - * @param underloadedNodes Underloaded nodes. - * @param topSnapshot Topology snapshot. - * @param force {@code True} if partitions should be moved. - * @param allowNeighbors Allow neighbors nodes for partition. - */ - private void assignPendingToUnderloaded( - int tier, - Map<Integer, Queue<Integer>> pendingMap, - FullAssignmentMap fullMap, - PrioritizedPartitionMap underloadedNodes, - Collection<ClusterNode> topSnapshot, - boolean force, - boolean allowNeighbors) { - Iterator<Integer> it = pendingMap.get(tier).iterator(); - - int ideal = parts / topSnapshot.size(); - - while (it.hasNext()) { - int part = it.next(); - - for (PartitionSet set : underloadedNodes.assignments()) { - ClusterNode node = set.node(); - - assert node != null; - - if (fullMap.assign(part, tier, node, pendingMap, force, allowNeighbors)) { - // We could add partition to partition map without forcing, remove partition from pending. - it.remove(); - - if (set.size() <= ideal) - underloadedNodes.remove(set.nodeId()); - else - underloadedNodes.update(); - - break; // for, continue to the next partition. - } - } - - if (underloadedNodes.isEmpty()) - return; - } - } - - /** - * Spreads pending partitions equally to all nodes in topology snapshot. - * - * @param tier Tier to assign. - * @param pendingMap Pending partitions per tier. - * @param fullMap Full assignment map to modify. - * @param topSnapshot Topology snapshot. - * @param allowNeighbors Allow neighbors nodes for partition. - */ - private void assignPendingToNodes(int tier, Map<Integer, Queue<Integer>> pendingMap, - FullAssignmentMap fullMap, List<ClusterNode> topSnapshot, boolean allowNeighbors) { - Iterator<Integer> it = pendingMap.get(tier).iterator(); - - int idx = 0; - - while (it.hasNext()) { - int part = it.next(); - - int i = idx; - - boolean assigned = false; - - do { - ClusterNode node = topSnapshot.get(i); - - if (fullMap.assign(part, tier, node, pendingMap, false, allowNeighbors)) { - it.remove(); - - assigned = true; - } - - i = (i + 1) % topSnapshot.size(); - - if (assigned) - idx = i; - } while (i != idx); - - if (!assigned) { - do { - ClusterNode node = topSnapshot.get(i); - - if (fullMap.assign(part, tier, node, pendingMap, true, allowNeighbors)) { - it.remove(); - - assigned = true; - } - - i = (i + 1) % topSnapshot.size(); - - if (assigned) - idx = i; - } while (i != idx); - } - - if (!assigned && (!exclNeighbors || exclNeighbors && allowNeighbors)) - throw new IllegalStateException("Failed to find assignable node for partition."); - } - } - - /** - * Tries to balance assignments between existing nodes in topology. - * - * @param tier Tier to assign. - * @param pendingParts Pending partitions per tier. - * @param fullMap Full assignment map to modify. - * @param topSnapshot Topology snapshot. - * @param allowNeighbors Allow neighbors nodes for partition. - */ - private boolean balance(int tier, Map<Integer, Queue<Integer>> pendingParts, FullAssignmentMap fullMap, - Collection<ClusterNode> topSnapshot, boolean allowNeighbors) { - int idealPartCnt = parts / topSnapshot.size(); - - Map<UUID, PartitionSet> mapping = fullMap.tierMapping(tier); - - PrioritizedPartitionMap underloadedNodes = filterNodes(mapping, idealPartCnt, false); - PrioritizedPartitionMap overloadedNodes = filterNodes(mapping, idealPartCnt, true); - - do { - boolean retry = false; - - for (PartitionSet overloaded : overloadedNodes.assignments()) { - for (Integer part : overloaded.partitions()) { - boolean assigned = false; - - for (PartitionSet underloaded : underloadedNodes.assignments()) { - if (fullMap.assign(part, tier, underloaded.node(), pendingParts, false, allowNeighbors)) { - // Size of partition sets has changed. - if (overloaded.size() <= idealPartCnt) - overloadedNodes.remove(overloaded.nodeId()); - else - overloadedNodes.update(); - - if (underloaded.size() >= idealPartCnt) - underloadedNodes.remove(underloaded.nodeId()); - else - underloadedNodes.update(); - - assigned = true; - - retry = true; - - break; - } - } - - if (!assigned) { - for (PartitionSet underloaded : underloadedNodes.assignments()) { - if (fullMap.assign(part, tier, underloaded.node(), pendingParts, true, allowNeighbors)) { - // Size of partition sets has changed. - if (overloaded.size() <= idealPartCnt) - overloadedNodes.remove(overloaded.nodeId()); - else - overloadedNodes.update(); - - if (underloaded.size() >= idealPartCnt) - underloadedNodes.remove(underloaded.nodeId()); - else - underloadedNodes.update(); - - retry = true; - - break; - } - } - } - - if (retry) - break; // for part. - } - - if (retry) - break; // for overloaded. - } - - if (!retry) - break; - } - while (true); - - return underloadedNodes.isEmpty(); - } - - /** - * Constructs underloaded or overloaded partition map. - * - * @param mapping Mapping to filter. - * @param idealPartCnt Ideal number of partitions per node. - * @param overloaded {@code True} if should create overloaded map, {@code false} for underloaded. - * @return Prioritized partition map. - */ - private PrioritizedPartitionMap filterNodes(Map<UUID, PartitionSet> mapping, int idealPartCnt, boolean overloaded) { - assert mapping != null; - - PrioritizedPartitionMap res = new PrioritizedPartitionMap(overloaded ? DESC_CMP : ASC_CMP); - - for (PartitionSet set : mapping.values()) { - if ((overloaded && set.size() > idealPartCnt) || (!overloaded && set.size() < idealPartCnt)) - res.add(set); - } - - return res; - } - - /** - * Creates copy of previous partition assignment. - * - * @param ctx Affinity function context. - * @param neighborhoodMap Neighbors nodes grouped by target node. - * @return Assignment copy and per node partition map. - */ - private List<List<ClusterNode>> createCopy(AffinityFunctionContext ctx, - Map<UUID, Collection<ClusterNode>> neighborhoodMap) - { - DiscoveryEvent discoEvt = ctx.discoveryEvent(); - - UUID leftNodeId = (discoEvt == null || discoEvt.type() == EventType.EVT_NODE_JOINED) - ? null - : discoEvt.eventNode().id(); - - List<List<ClusterNode>> cp = new ArrayList<>(parts); - - for (int part = 0; part < parts; part++) { - List<ClusterNode> partNodes = ctx.previousAssignment(part); - - List<ClusterNode> partNodesCp; - - if (partNodes == null) - partNodesCp = new ArrayList<>(); - else - partNodesCp = copyAssigments(neighborhoodMap, partNodes, leftNodeId); - - cp.add(partNodesCp); - } - - return cp; - } - - /** - * @param neighborhoodMap Neighbors nodes grouped by target node. - * @param partNodes Partition nodes. - * @param leftNodeId Left node id. - */ - private List<ClusterNode> copyAssigments(Map<UUID, Collection<ClusterNode>> neighborhoodMap, - List<ClusterNode> partNodes, UUID leftNodeId) { - final List<ClusterNode> partNodesCp = new ArrayList<>(partNodes.size()); - - for (ClusterNode node : partNodes) { - if (node.id().equals(leftNodeId)) - continue; - - boolean containsNeighbor = false; - - if (neighborhoodMap != null) - containsNeighbor = F.exist(neighborhoodMap.get(node.id()), new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - return partNodesCp.contains(node); - } - }); - - if (!containsNeighbor) - partNodesCp.add(node); - } - - return partNodesCp; - } - - /** - * - */ - private static class PartitionSetComparator implements Comparator<PartitionSet>, Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public int compare(PartitionSet o1, PartitionSet o2) { - return Integer.compare(o1.parts.size(), o2.parts.size()); - } - } - - /** - * Prioritized partition map. Ordered structure in which nodes are ordered in ascending or descending order - * by number of partitions assigned to a node. - */ - private static class PrioritizedPartitionMap { - /** Comparator. */ - private Comparator<PartitionSet> cmp; - - /** Assignment map. */ - private Map<UUID, PartitionSet> assignmentMap = new HashMap<>(); - - /** Assignment list, ordered according to comparator. */ - private List<PartitionSet> assignmentList = new ArrayList<>(); - - /** - * @param cmp Comparator. - */ - private PrioritizedPartitionMap(Comparator<PartitionSet> cmp) { - this.cmp = cmp; - } - - /** - * @param set Partition set to add. - */ - public void add(PartitionSet set) { - PartitionSet old = assignmentMap.put(set.nodeId(), set); - - if (old == null) { - assignmentList.add(set); - - update(); - } - } - - /** - * Sorts assignment list. - */ - public void update() { - Collections.sort(assignmentList, cmp); - } - - /** - * @return Sorted assignment list. - */ - public List<PartitionSet> assignments() { - return assignmentList; - } - - /** - * @param uuid Uuid. - */ - public void remove(UUID uuid) { - PartitionSet rmv = assignmentMap.remove(uuid); - - assignmentList.remove(rmv); - } - - /** - * - */ - public boolean isEmpty() { - return assignmentList.isEmpty(); - } - } - - /** - * Full assignment map. Auxiliary data structure which maintains resulting assignment and temporary - * maps consistent. - */ - @SuppressWarnings("unchecked") - private class FullAssignmentMap { - /** Per-tier assignment maps. */ - private Map<UUID, PartitionSet>[] tierMaps; - - /** Full assignment map. */ - private Map<UUID, PartitionSet> fullMap; - - /** Resulting assignment. */ - private List<List<ClusterNode>> assignments; - - /** Neighborhood map. */ - private final Map<UUID, Collection<ClusterNode>> neighborhoodMap; - - /** - * @param tiers Number of tiers. - * @param assignments Assignments to modify. - * @param topSnapshot Topology snapshot. - * @param neighborhoodMap Neighbors nodes grouped by target node. - */ - private FullAssignmentMap(int tiers, - List<List<ClusterNode>> assignments, - Collection<ClusterNode> topSnapshot, - Map<UUID, Collection<ClusterNode>> neighborhoodMap) - { - this.assignments = assignments; - this.neighborhoodMap = neighborhoodMap; - this.tierMaps = new Map[tiers]; - - for (int tier = 0; tier < tiers; tier++) - tierMaps[tier] = assignments(tier, topSnapshot); - - fullMap = assignments(-1, topSnapshot); - } - - /** - * Tries to assign partition to given node on specified tier. If force is false, assignment will succeed - * only if this partition is not already assigned to a node. If force is true, then assignment will succeed - * only if partition is not assigned to a tier with number less than passed in. Assigned partition from - * greater tier will be moved to pending queue. - * - * @param part Partition to assign. - * @param tier Tier number to assign. - * @param node Node to move partition to. - * @param pendingParts per tier pending partitions map. - * @param force Force flag. - * @param allowNeighbors Allow neighbors nodes for partition. - * @return {@code True} if assignment succeeded. - */ - boolean assign(int part, - int tier, - ClusterNode node, - Map<Integer, Queue<Integer>> pendingParts, boolean force, - boolean allowNeighbors) - { - UUID nodeId = node.id(); - - if (isAssignable(part, tier, node, allowNeighbors)) { - tierMaps[tier].get(nodeId).add(part); - - fullMap.get(nodeId).add(part); - - List<ClusterNode> assignment = assignments.get(part); - - if (assignment.size() <= tier) - assignment.add(node); - else { - ClusterNode oldNode = assignment.set(tier, node); - - if (oldNode != null) { - UUID oldNodeId = oldNode.id(); - - tierMaps[tier].get(oldNodeId).remove(part); - fullMap.get(oldNodeId).remove(part); - } - } - - return true; - } - else if (force) { - assert !tierMaps[tier].get(nodeId).contains(part); - - // Check previous tiers first. - for (int t = 0; t < tier; t++) { - if (tierMaps[t].get(nodeId).contains(part)) - return false; - } - - // Partition is on some lower tier, switch it. - for (int t = tier + 1; t < tierMaps.length; t++) { - if (tierMaps[t].get(nodeId).contains(part)) { - ClusterNode oldNode = assignments.get(part).get(tier); - - // Move partition from level t to tier. - assignments.get(part).set(tier, node); - assignments.get(part).set(t, null); - - if (oldNode != null) { - tierMaps[tier].get(oldNode.id()).remove(part); - fullMap.get(oldNode.id()).remove(part); - } - - tierMaps[tier].get(nodeId).add(part); - tierMaps[t].get(nodeId).remove(part); - - Queue<Integer> pending = pendingParts.get(t); - - if (pending == null) - pendingParts.put(t, pending = new LinkedList<>()); - - pending.add(part); - - return true; - } - } - - return false; - } - - // !force. - return false; - } - - /** - * Gets tier mapping. - * - * @param tier Tier to get mapping. - * @return Per node map. - */ - public Map<UUID, PartitionSet> tierMapping(int tier) { - return tierMaps[tier]; - } - - /** - * @param part Partition. - * @param tier Tier. - * @param node Node. - * @param allowNeighbors Allow neighbors. - * @return {@code true} if the partition is assignable to the node. - */ - private boolean isAssignable(int part, int tier, final ClusterNode node, boolean allowNeighbors) { - if (containsPartition(part, node)) - return false; - - if (exclNeighbors) - return allowNeighbors || !neighborsContainPartition(node, part); - else if (affinityBackupFilter != null) { - List<ClusterNode> assignment = assignments.get(part); - - if (assignment.isEmpty()) - return true; - - List<ClusterNode> newAssignment; - - if (tier == 0) { - for (int t = 1; t < assignment.size(); t++) { - newAssignment = new ArrayList<>(assignment.size() - 1); - - newAssignment.add(node); - - if (t != 1) - newAssignment.addAll(assignment.subList(1, t)); - - if (t + 1 < assignment.size()) - newAssignment.addAll(assignment.subList(t + 1, assignment.size())); - - if (!affinityBackupFilter.apply(assignment.get(t), newAssignment)) - return false; - } - - return true; - } - else if (tier < assignment.size()) { - newAssignment = new ArrayList<>(assignment.size() - 1); - - int i = 0; - - for (ClusterNode assignmentNode: assignment) { - if (i != tier) - newAssignment.add(assignmentNode); - - i++; - } - } - else - newAssignment = assignment; - - return affinityBackupFilter.apply(node, newAssignment); - } - else if (backupFilter != null) { - if (tier == 0) { - List<ClusterNode> assignment = assignments.get(part); - - if (assignment.isEmpty()) - return true; - - List<ClusterNode> backups = assignment.subList(1, assignment.size()); - - return !F.exist(backups, new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode n) { - return !backupFilter.apply(node, n); - } - }); - } - else - return (backupFilter.apply(assignments.get(part).get(0), node)); - } - else - return true; - } - - /** - * @param part Partition. - * @param node Node. - */ - private boolean containsPartition(int part, ClusterNode node) { - return fullMap.get(node.id()).contains(part); - } - - /** - * @param node Node. - * @param part Partition. - */ - private boolean neighborsContainPartition(ClusterNode node, final int part) { - return F.exist(neighborhoodMap.get(node.id()), new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode n) { - return fullMap.get(n.id()).contains(part); - } - }); - } - - /** - * Constructs assignments map for specified tier. - * - * @param tier Tier number, -1 for all tiers altogether. - * @param topSnapshot Topology snapshot. - * @return Assignment map. - */ - private Map<UUID, PartitionSet> assignments(int tier, Collection<ClusterNode> topSnapshot) { - Map<UUID, PartitionSet> tmp = new LinkedHashMap<>(); - - for (int part = 0; part < assignments.size(); part++) { - List<ClusterNode> nodes = assignments.get(part); - - assert nodes instanceof RandomAccess; - - if (nodes.size() <= tier) - continue; - - int start = tier < 0 ? 0 : tier; - int end = tier < 0 ? nodes.size() : tier + 1; - - for (int i = start; i < end; i++) { - ClusterNode n = nodes.get(i); - - PartitionSet set = tmp.get(n.id()); - - if (set == null) - tmp.put(n.id(), set = new PartitionSet(n)); - - set.add(part); - } - } - - if (tmp.size() < topSnapshot.size()) { - for (ClusterNode node : topSnapshot) { - if (!tmp.containsKey(node.id())) - tmp.put(node.id(), new PartitionSet(node)); - } - } - - return tmp; - } - } - - /** - * Applies a supplemental hash function to a given hashCode, which - * defends against poor quality hash functions. - * - * @param h Hash code. - * @return Enhanced hash code. - */ - private static int hash(int h) { - // Spread bits to regularize both segment and index locations, - // using variant of single-word Wang/Jenkins hash. - h += (h << 15) ^ 0xffffcd7d; - h ^= (h >>> 10); - h += (h << 3); - h ^= (h >>> 6); - h += (h << 2) + (h << 14); - return h ^ (h >>> 16); - } - - /** - * - */ - @SuppressWarnings("ComparableImplementedButEqualsNotOverridden") - private static class PartitionSet { - /** */ - private ClusterNode node; - - /** Partitions. */ - private Collection<Integer> parts = new LinkedList<>(); - - /** - * @param node Node. - */ - private PartitionSet(ClusterNode node) { - this.node = node; - } - - /** - * @return Node. - */ - private ClusterNode node() { - return node; - } - - /** - * @return Node ID. - */ - private UUID nodeId() { - return node.id(); - } - - /** - * @return Partition set size. - */ - private int size() { - return parts.size(); - } - - /** - * Adds partition to partition set. - * - * @param part Partition to add. - * @return {@code True} if partition was added, {@code false} if partition already exists. - */ - private boolean add(int part) { - if (!parts.contains(part)) { - parts.add(part); - - return true; - } - - return false; - } - - /** - * @param part Partition to remove. - */ - private void remove(Integer part) { - parts.remove(part); // Remove object, not index. - } - - /** - * @return Partitions. - */ - @SuppressWarnings("TypeMayBeWeakened") - private Collection<Integer> partitions() { - return parts; - } - - /** - * Checks if partition set contains given partition. - * - * @param part Partition to check. - * @return {@code True} if partition set contains given partition. - */ - private boolean contains(int part) { - return parts.contains(part); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "PartSet [nodeId=" + node.id() + ", size=" + parts.size() + ", parts=" + parts + ']'; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f9b64246/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/package-info.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/package-info.java deleted file mode 100644 index bd706cb..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains fair cache affinity for partitioned cache. - */ -package org.apache.ignite.cache.affinity.fair; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f9b64246/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index d7395df..46a9899 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@ -19,26 +19,23 @@ package org.apache.ignite.internal.processors.platform.utils; import java.lang.management.ManagementFactory; import java.net.InetSocketAddress; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.Collections; import java.util.ServiceLoader; +import java.util.Set; import javax.cache.configuration.Factory; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.binary.BinaryArrayIdentityResolver; -import org.apache.ignite.internal.binary.BinaryIdentityResolver; import org.apache.ignite.binary.BinaryRawReader; import org.apache.ignite.binary.BinaryRawWriter; -import org.apache.ignite.binary.BinaryTypeConfiguration; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; @@ -48,7 +45,6 @@ import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.QueryIndexType; import org.apache.ignite.cache.affinity.AffinityFunction; -import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.eviction.EvictionPolicy; import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy; @@ -59,16 +55,18 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.internal.binary.BinaryArrayIdentityResolver; +import org.apache.ignite.internal.binary.BinaryIdentityResolver; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction; import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory; +import org.apache.ignite.internal.processors.platform.plugin.cache.PlatformCachePluginConfiguration; import org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction; import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryConfiguration; import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryTypeConfiguration; import org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactoryNative; import org.apache.ignite.platform.dotnet.PlatformDotNetConfiguration; -import org.apache.ignite.internal.processors.platform.plugin.cache.PlatformCachePluginConfiguration; import org.apache.ignite.plugin.CachePluginConfiguration; import org.apache.ignite.plugin.platform.PlatformCachePluginConfigurationClosure; import org.apache.ignite.plugin.platform.PlatformCachePluginConfigurationClosureFactory; @@ -324,11 +322,7 @@ public class PlatformConfigurationUtils { switch (plcTyp) { case 1: { - FairAffinityFunction f = new FairAffinityFunction(); - f.setPartitions(partitions); - f.setExcludeNeighbors(exclNeighbours); - baseFunc = f; - break; + throw new IllegalStateException("FairAffinityFunction"); } case 2: { RendezvousAffinityFunction f = new RendezvousAffinityFunction(); @@ -368,15 +362,7 @@ public class PlatformConfigurationUtils { if (f instanceof PlatformDotNetAffinityFunction) f = ((PlatformDotNetAffinityFunction)f).getFunc(); - if (f instanceof FairAffinityFunction) { - out.writeByte((byte) 1); - - FairAffinityFunction f0 = (FairAffinityFunction) f; - out.writeInt(f0.getPartitions()); - out.writeBoolean(f0.isExcludeNeighbors()); - out.writeByte((byte) 0); // override flags - out.writeObject(null); // user func - } else if (f instanceof RendezvousAffinityFunction) { + if (f instanceof RendezvousAffinityFunction) { out.writeByte((byte) 2); RendezvousAffinityFunction f0 = (RendezvousAffinityFunction) f; @@ -388,13 +374,7 @@ public class PlatformConfigurationUtils { PlatformAffinityFunction f0 = (PlatformAffinityFunction) f; AffinityFunction baseFunc = f0.getBaseFunc(); - if (baseFunc instanceof FairAffinityFunction) { - out.writeByte((byte) 1); - out.writeInt(f0.partitions()); - out.writeBoolean(((FairAffinityFunction) baseFunc).isExcludeNeighbors()); - out.writeByte(f0.getOverrideFlags()); - out.writeObject(f0.getUserFunc()); - } else if (baseFunc instanceof RendezvousAffinityFunction) { + if (baseFunc instanceof RendezvousAffinityFunction) { out.writeByte((byte) 2); out.writeInt(f0.partitions()); out.writeBoolean(((RendezvousAffinityFunction) baseFunc).isExcludeNeighbors()); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9b64246/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java index ada9477..b8a931f 100644 --- a/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java @@ -21,7 +21,6 @@ import java.util.Collection; import java.util.HashSet; import java.util.UUID; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -41,9 +40,6 @@ public class GridCacheAffinityBackupsSelfTest extends GridCommonAbstractTest { /** Number of backups. */ private int backups; - /** Affinity function. */ - private int funcType; - /** */ private int nodesCnt = 5; @@ -57,7 +53,7 @@ public class GridCacheAffinityBackupsSelfTest extends GridCommonAbstractTest { ccfg.setCacheMode(CacheMode.PARTITIONED); ccfg.setBackups(backups); - ccfg.setAffinity(funcType == 0 ? new FairAffinityFunction() : new RendezvousAffinityFunction()); + ccfg.setAffinity(new RendezvousAffinityFunction()); cfg.setCacheConfiguration(ccfg); @@ -69,25 +65,15 @@ public class GridCacheAffinityBackupsSelfTest extends GridCommonAbstractTest { */ public void testRendezvousBackups() throws Exception { for (int i = 0; i < nodesCnt; i++) - checkBackups(i, 1); - } - - /** - * @throws Exception If failed. - */ - public void testFairBackups() throws Exception { - for (int i = 0; i < nodesCnt; i++) - checkBackups(i, 0); + checkBackups(i); } /** * @param backups Number of backups. - * @param funcType Affinity function type. * @throws Exception If failed. */ - private void checkBackups(int backups, int funcType) throws Exception { + private void checkBackups(int backups) throws Exception { this.backups = backups; - this.funcType = funcType; startGridsMultiThreaded(nodesCnt, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9b64246/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java b/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java index 26dc2dc..21e54db 100644 --- a/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java @@ -25,7 +25,6 @@ import java.util.Map; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; -import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -45,9 +44,6 @@ public class IgniteCacheAffinitySelfTest extends IgniteCacheAbstractTest { private int GRID_CNT = 3; /** Cache name */ - private final String CACHE1 = "Fair"; - - /** Cache name */ private final String CACHE2 = "Rendezvous"; /** {@inheritDoc} */ @@ -61,10 +57,6 @@ public class IgniteCacheAffinitySelfTest extends IgniteCacheAbstractTest { CacheConfiguration cache0 = cacheConfiguration(null); - CacheConfiguration cache1 = cacheConfiguration(null); - cache1.setName(CACHE1); - cache1.setAffinity(new FairAffinityFunction()); - CacheConfiguration cache2 = cacheConfiguration(null); cache2.setName(CACHE2); cache2.setAffinity(new RendezvousAffinityFunction()); @@ -72,7 +64,7 @@ public class IgniteCacheAffinitySelfTest extends IgniteCacheAbstractTest { if (igniteInstanceName.contains("0")) cfg.setCacheConfiguration(cache0); else - cfg.setCacheConfiguration(cache0, cache1, cache2); + cfg.setCacheConfiguration(cache0, cache2); return cfg; } @@ -113,8 +105,6 @@ public class IgniteCacheAffinitySelfTest extends IgniteCacheAbstractTest { */ private void checkAffinity() { checkAffinity(grid(0).affinity(null), internalCache(1, null).affinity()); - checkAffinity(grid(0).affinity(CACHE1), internalCache(1, CACHE1).affinity()); - checkAffinity(grid(0).affinity(CACHE1), internalCache(1, CACHE1).affinity()); checkAffinity(grid(0).affinity(CACHE2), internalCache(1, CACHE2).affinity()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f9b64246/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java index 0a3f96c..04c6061 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java @@ -20,7 +20,6 @@ package org.apache.ignite.cache.affinity; import java.util.Collection; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -52,9 +51,6 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest { private static final String CACHE2 = "cache2"; /** */ - private static final String CACHE3 = "cache3"; - - /** */ private static final String CACHE4 = "cache4"; /** */ @@ -79,13 +75,6 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest { ccfg2.setName(CACHE2); ccfg2.setAffinity(new RendezvousAffinityFunction()); - CacheConfiguration ccfg3 = new CacheConfiguration(); - - ccfg3.setBackups(1); - ccfg3.setName(CACHE3); - ccfg3.setAffinity(new FairAffinityFunction()); - ccfg3.setNodeFilter(new TestNodesFilter()); - CacheConfiguration ccfg4 = new CacheConfiguration(); ccfg4.setCacheMode(REPLICATED); @@ -103,7 +92,7 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest { cfg.setCacheConfiguration(ccfg5); } else - cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3, ccfg4); + cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg4); return cfg; } @@ -130,8 +119,6 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest { checkCache(CACHE2, 2); - checkCache(CACHE3, 2); - checkCache(CACHE4, 3); checkCache(CACHE5, 2); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9b64246/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java index 06964e0..87b472d 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -63,7 +62,7 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest { CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setName("static-cache-" + i); - ccfg.setAffinity(i % 2 == 0 ? new RendezvousAffinityFunction() : new FairAffinityFunction()); + ccfg.setAffinity(new RendezvousAffinityFunction()); ccfgs[i] = ccfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f9b64246/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java deleted file mode 100644 index 65f08c2..0000000 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity.fair; - -import java.util.concurrent.Callable; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * - */ -public class FairAffinityDynamicCacheSelfTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - startGridsMultiThreaded(3); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** - * @throws Exception If failed. - */ - public void testStartStopCache() throws Exception { - CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(); - - cacheCfg.setCacheMode(CacheMode.PARTITIONED); - cacheCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); - cacheCfg.setBackups(1); - cacheCfg.setName("test"); - cacheCfg.setAffinity(new FairAffinityFunction()); - - final IgniteCache<Integer, Integer> cache = ignite(0).createCache(cacheCfg); - - for (int i = 0; i < 10_000; i++) - cache.put(i, i); - - IgniteInternalFuture<Object> destFut = GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - ignite(0).destroyCache(cache.getName()); - - return null; - } - }); - - destFut.get(5000L); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f9b64246/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java deleted file mode 100644 index 7fddf30..0000000 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity.fair; - -import org.apache.ignite.cache.affinity.AffinityFunction; -import org.apache.ignite.cache.affinity.AffinityFunctionBackupFilterAbstractSelfTest; - -/** - * Tests backup filter for {@link FairAffinityFunction}. - */ -public class FairAffinityFunctionBackupFilterSelfTest extends AffinityFunctionBackupFilterAbstractSelfTest { - /** {@inheritDoc} */ - @Override protected AffinityFunction affinityFunction() { - FairAffinityFunction aff = new FairAffinityFunction(false); - - aff.setBackupFilter(backupFilter); - - return aff; - } - - /** {@inheritDoc} */ - @Override protected AffinityFunction affinityFunctionWithAffinityBackupFilter() { - FairAffinityFunction aff = new FairAffinityFunction(false); - - aff.setAffinityBackupFilter(affinityBackupFilter); - - return aff; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f9b64246/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionExcludeNeighborsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionExcludeNeighborsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionExcludeNeighborsSelfTest.java deleted file mode 100644 index 4182cd3..0000000 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionExcludeNeighborsSelfTest.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity.fair; - -import org.apache.ignite.cache.affinity.AffinityFunction; -import org.apache.ignite.cache.affinity.AffinityFunctionExcludeNeighborsAbstractSelfTest; - -/** - * Tests exclude neighbors flag for {@link FairAffinityFunction}. - */ -public class FairAffinityFunctionExcludeNeighborsSelfTest extends AffinityFunctionExcludeNeighborsAbstractSelfTest { - /** {@inheritDoc} */ - @Override protected AffinityFunction affinityFunction() { - return new FairAffinityFunction(true); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f9b64246/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionNodesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionNodesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionNodesSelfTest.java deleted file mode 100644 index 95bf30c..0000000 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionNodesSelfTest.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity.fair; - -import java.util.Collection; -import java.util.List; -import java.util.TreeSet; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * Tests partition fair affinity in real grid. - */ -public class FairAffinityFunctionNodesSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Number of backups. */ - private int backups; - - /** Number of partitions. */ - private int parts = 512; - - /** Add nodes test. */ - private static final boolean[] ADD_ONLY = new boolean[] {true, true, true, true, true, true}; - - /** Add nodes test. */ - private static final boolean[] ADD_REMOVE = new boolean[] - { - true, true, true, true, true, true, - false, false, false, false, false - }; - - /** */ - private static final boolean[] MIXED1 = new boolean[] - { - // 1 2 3 2 3 4 3 4 5 4 3 2 - true, true, true, false, true, true, false, true, true, false, false, false - }; - - /** */ - private static final boolean[] MIXED2 = new boolean[] - { - // 1 2 3 2 1 2 1 2 3 2 1 2 - true, true, true, false, false, true, false, true, true, false, false, true - }; - - /** */ - private static final boolean[] MIXED3 = new boolean[] - { - // 1 2 3 4 5 6 5 6 7 8 9 8 7 8 9 - true, true, true, true, true, true, false, true, true, true, true, false, false, true, true, - // 8 7 6 - false, false, false - }; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - CacheConfiguration ccfg = cacheConfiguration(); - - cfg.setCacheConfiguration(ccfg); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - return cfg; - } - - /** - * @return Cache configuration. - */ - private CacheConfiguration cacheConfiguration() { - CacheConfiguration cfg = new CacheConfiguration(); - - cfg.setBackups(backups); - - cfg.setCacheMode(CacheMode.PARTITIONED); - - cfg.setNearConfiguration(null); - - cfg.setAffinity(new FairAffinityFunction(parts)); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testAdd() throws Exception { - checkSequence(ADD_ONLY); - } - - /** - * @throws Exception If failed. - */ - public void testAddRemove() throws Exception { - checkSequence(ADD_REMOVE); - } - - /** - * @throws Exception If failed. - */ - public void testMixed1() throws Exception { - checkSequence(MIXED1); - } - - /** - * @throws Exception If failed. - */ - public void testMixed2() throws Exception { - checkSequence(MIXED2); - } - - /** - * @throws Exception If failed. - */ - public void testMixed3() throws Exception { - checkSequence(MIXED3); - } - - /** - * @throws Exception If failed. - */ - private void checkSequence(boolean[] seq) throws Exception { - for (int b = 0; b < 3; b++) { - backups = b; - - info(">>>>>>>>>>>>>>>> Checking backups: " + backups); - - checkSequence0(seq); - - info(">>>>>>>>>>>>>>>> Finished check: " + backups); - } - } - - /** - * @param seq Start/stop sequence. - * @throws Exception If failed. - */ - private void checkSequence0(boolean[] seq) throws Exception { - try { - startGrid(0); - - TreeSet<Integer> started = new TreeSet<>(); - - started.add(0); - - int topVer = 1; - - for (boolean start : seq) { - if (start) { - int nextIdx = nextIndex(started); - - startGrid(nextIdx); - - started.add(nextIdx); - } - else { - int idx = started.last(); - - stopGrid(idx); - - started.remove(idx); - } - - awaitPartitionMapExchange(); - - topVer++; - - info("Grid 0: " + grid(0).localNode().id()); - - ((IgniteKernal)grid(0)).internalCache().context().affinity().affinityReadyFuture(topVer).get(); - - for (int i : started) { - if (i != 0) { - IgniteEx grid = grid(i); - - ((IgniteKernal)grid).internalCache().context().affinity().affinityReadyFuture(topVer).get(); - - info("Grid " + i + ": " + grid.localNode().id()); - - for (int part = 0; part < parts; part++) { - List<ClusterNode> firstNodes = (List<ClusterNode>)grid(0).affinity(null) - .mapPartitionToPrimaryAndBackups(part); - - List<ClusterNode> secondNodes = (List<ClusterNode>)grid.affinity(null) - .mapPartitionToPrimaryAndBackups(part); - - assertEquals(firstNodes.size(), secondNodes.size()); - - for (int n = 0; n < firstNodes.size(); n++) - assertEquals(firstNodes.get(n), secondNodes.get(n)); - } - } - } - } - } - finally { - stopAllGrids(); - } - } - - /** - * First positive integer that is not present in started set. - * - * @param started Already started indices. - * @return First positive integer that is not present in started set. - */ - private int nextIndex(Collection<Integer> started) { - assert started.contains(0); - - for (int i = 1; i < 10000; i++) { - if (!started.contains(i)) - return i; - } - - throw new IllegalStateException(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f9b64246/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionSelfTest.java deleted file mode 100644 index a79c9fc..0000000 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionSelfTest.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity.fair; - -import org.apache.ignite.cache.affinity.AbstractAffinityFunctionSelfTest; -import org.apache.ignite.cache.affinity.AffinityFunction; - -/** - * Tests for {@link FairAffinityFunction}. - */ -public class FairAffinityFunctionSelfTest extends AbstractAffinityFunctionSelfTest { - /** {@inheritDoc} */ - @Override protected AffinityFunction affinityFunction() { - return new FairAffinityFunction(); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f9b64246/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityNodesRestart.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityNodesRestart.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityNodesRestart.java deleted file mode 100644 index 37f1bfb..0000000 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityNodesRestart.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity.fair; - -import java.util.List; -import java.util.concurrent.Callable; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.typedef.P2; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * Tests that FairAffinityFunction doesn't throw exception on nodes restart, - * with backup filter set and 0 cache backups. - */ -public class FairAffinityNodesRestart extends GridCommonAbstractTest { - /** */ - private final static P2<ClusterNode, ClusterNode> BACKUP_FILTER = new P2<ClusterNode, ClusterNode>() { - @Override public boolean apply(ClusterNode node, ClusterNode node2) { - return true; - } - }; - - /** */ - private final static P2<ClusterNode, List<ClusterNode>> AFF_BACKUP_FILTER = new P2<ClusterNode, List<ClusterNode>>() { - @Override public boolean apply(ClusterNode node, List<ClusterNode> nodes) { - return true; - } - }; - - /** */ - private boolean affBackup; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - CacheConfiguration ccfg = new CacheConfiguration("fair-cache"); - - FairAffinityFunction aff = new FairAffinityFunction(32); - - if (!affBackup) - aff.setBackupFilter(BACKUP_FILTER); - else - aff.setAffinityBackupFilter(AFF_BACKUP_FILTER); - - ccfg.setAffinity(aff); - ccfg.setBackups(0); - ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); - - cfg.setCacheConfiguration(ccfg); - - return cfg; - } - - /** - * @param idx Node index. - * @return Future. - */ - private IgniteInternalFuture<IgniteEx> startAsyncGrid(final int idx) { - return GridTestUtils.runAsync(new Callable<IgniteEx>() { - @Override public IgniteEx call() throws Exception { - return startGrid(idx); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testBackupFilter() throws Exception { - affBackup = false; - - check(); - } - - /** - * @throws Exception If failed. - */ - public void testAffinityBackupFilter() throws Exception { - affBackup = true; - - check(); - } - - /** - * @throws Exception If failed. - */ - private void check() throws Exception { - for (int i = 0; i < 2; i++) { - IgniteInternalFuture<IgniteEx> fut0 = startAsyncGrid(0); - IgniteInternalFuture<IgniteEx> fut1 = startAsyncGrid(1); - IgniteInternalFuture<IgniteEx> fut2 = startAsyncGrid(2); - - IgniteEx ignite = fut0.get(); - fut1.get(); - fut2.get(); - - IgniteCache<Integer, String> cache = ignite.cache("fair-cache"); - - for (int j = 0; j < 100; j++) - cache.put(i, String.valueOf(i)); - - stopGrid(0); - stopGrid(1); - stopGrid(2); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f9b64246/modules/core/src/test/java/org/apache/ignite/cache/affinity/local/LocalAffinityFunctionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/local/LocalAffinityFunctionTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/local/LocalAffinityFunctionTest.java index b2847ea..fe3de71 100755 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/local/LocalAffinityFunctionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/local/LocalAffinityFunctionTest.java @@ -19,7 +19,7 @@ package org.apache.ignite.cache.affinity.local; import org.apache.ignite.Ignite; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -50,7 +50,7 @@ public class LocalAffinityFunctionTest extends GridCommonAbstractTest { ccfg.setBackups(1); ccfg.setName(CACHE1); ccfg.setCacheMode(CacheMode.LOCAL); - ccfg.setAffinity(new FairAffinityFunction()); + ccfg.setAffinity(new RendezvousAffinityFunction()); cfg.setCacheConfiguration(ccfg); return cfg;