The addition of the new ReplicaPosition.java file here appears to have broken (the documentation-lint part of) precommit.
[exec] missing description: org.apache.solr.common.cloud [exec] [exec] Missing javadocs were found! ----- Original Message ----- From: dev@lucene.apache.org To: comm...@lucene.apache.org At: 06/30/17 07:21:06 Repository: lucene-solr Updated Branches: refs/heads/master 0159d494f -> 15118d40c SOLR-1095: Refactor code to standardize replica assignment Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/196d84b9 Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/196d84b9 Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/196d84b9 Branch: refs/heads/master Commit: 196d84b9e08730e9af225450217032cf70d52b5a Parents: 0159d49 Author: Noble Paul <no...@apache.org> Authored: Fri Jun 30 15:49:40 2017 +0930 Committer: Noble Paul <no...@apache.org> Committed: Fri Jun 30 15:49:40 2017 +0930 ---------------------------------------------------------------------- .../src/java/org/apache/solr/cloud/Assign.java | 131 ++++++++++++++++--- .../apache/solr/cloud/CreateCollectionCmd.java | 31 ++--- .../java/org/apache/solr/cloud/RestoreCmd.java | 30 +++-- .../org/apache/solr/cloud/SplitShardCmd.java | 13 +- .../apache/solr/cloud/rule/ReplicaAssigner.java | 68 ++++------ .../solr/common/cloud/ReplicaPosition.java | 55 ++++++++ .../apache/solr/cloud/rule/RuleEngineTest.java | 8 +- 7 files changed, 234 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/196d84b9/solr/core/src/java/org/apache/solr/cloud/Assign.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java index 9f21245..7c9752d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Assign.java +++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java @@ -25,10 +25,14 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Random; import java.util.Set; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import com.google.common.collect.ImmutableMap; import org.apache.solr.client.solrj.cloud.autoscaling.Policy; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; import org.apache.solr.client.solrj.impl.CloudSolrClient; @@ -40,7 +44,9 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ReplicaPosition; import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.Utils; @@ -49,6 +55,11 @@ import org.apache.zookeeper.KeeperException; import static java.util.Collections.singletonMap; import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY; +import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET; +import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY; +import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE; +import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE_DEFAULT; +import static org.apache.solr.common.cloud.DocCollection.SNITCH; import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE; import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH; @@ -119,7 +130,7 @@ public class Assign { returnShardId = shardIdNames.get(0); return returnShardId; } - + public static String buildCoreName(String collectionName, String shard, Replica.Type type, int replicaNum) { // TODO: Adding the suffix is great for debugging, but may be an issue if at some point we want to support a way to change replica type return String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, shard, type.name().substring(0,1).toLowerCase(Locale.ROOT), replicaNum); @@ -141,6 +152,91 @@ public class Assign { else return replicaName; } } + public static List<String> getLiveOrLiveAndCreateNodeSetList(final Set<String> liveNodes, final ZkNodeProps message, final Random random) { + // TODO: add smarter options that look at the current number of cores per + // node? + // for now we just go random (except when createNodeSet and createNodeSet.shuffle=false are passed in) + + List<String> nodeList; + + final String createNodeSetStr = message.getStr(CREATE_NODE_SET); + final List<String> createNodeList = (createNodeSetStr == null) ? null : StrUtils.splitSmart((CREATE_NODE_SET_EMPTY.equals(createNodeSetStr) ? "" : createNodeSetStr), ",", true); + + if (createNodeList != null) { + nodeList = new ArrayList<>(createNodeList); + nodeList.retainAll(liveNodes); + if (message.getBool(CREATE_NODE_SET_SHUFFLE, CREATE_NODE_SET_SHUFFLE_DEFAULT)) { + Collections.shuffle(nodeList, random); + } + } else { + nodeList = new ArrayList<>(liveNodes); + Collections.shuffle(nodeList, random); + } + + return nodeList; + } + + public static List<ReplicaPosition> identifyNodes(Supplier<CoreContainer> coreContainer, + ZkStateReader zkStateReader, + ClusterState clusterState, + List<String> nodeList, + String collectionName, + ZkNodeProps message, + List<String> shardNames, + int numNrtReplicas, + int numTlogReplicas, + int numPullReplicas) throws KeeperException, InterruptedException { + List<Map> rulesMap = (List) message.get("rule"); + String policyName = message.getStr(POLICY); + Map autoScalingJson = Utils.getJson(zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true); + + if (rulesMap == null && policyName == null) { + int i = 0; + List<ReplicaPosition> result = new ArrayList<>(); + for (String aShard : shardNames) { + + for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas, + Replica.Type.TLOG, numTlogReplicas, + Replica.Type.PULL, numPullReplicas + ).entrySet()) { + for (int j = 0; j < e.getValue(); j++){ + result.add(new ReplicaPosition(aShard, j, e.getKey(), nodeList.get(i % nodeList.size()))); + i++; + } + } + + } + return result; + } else { + if (numTlogReplicas + numPullReplicas != 0) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules or cluster policies"); + } + } + + if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) { + return getPositionsUsingPolicy(collectionName, + shardNames, numNrtReplicas, policyName, zkStateReader, nodeList); + } else { + List<Rule> rules = new ArrayList<>(); + for (Object map : rulesMap) rules.add(new Rule((Map) map)); + Map<String, Integer> sharVsReplicaCount = new HashMap<>(); + + for (String shard : shardNames) sharVsReplicaCount.put(shard, numNrtReplicas); + ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules, + sharVsReplicaCount, + (List<Map>) message.get(SNITCH), + new HashMap<>(),//this is a new collection. So, there are no nodes in any shard + nodeList, + coreContainer.get(), + clusterState); + + Map<ReplicaPosition, String> nodeMappings = replicaAssigner.getNodeMappings(); + return nodeMappings.entrySet().stream() + .map(e -> new ReplicaPosition(e.getKey().shard, e.getKey().index, e.getKey().type, e.getValue())) + .collect(Collectors.toList()); + } + } static class ReplicaCount { public final String nodeName; @@ -191,21 +287,21 @@ public class Assign { } List l = (List) coll.get(DocCollection.RULE); - Map<ReplicaAssigner.Position, String> positions = null; + List<ReplicaPosition> replicaPositions = null; if (l != null) { - positions = getNodesViaRules(clusterState, shard, numberOfNodes, cc, coll, createNodeList, l); + replicaPositions = getNodesViaRules(clusterState, shard, numberOfNodes, cc, coll, createNodeList, l); } String policyName = coll.getStr(POLICY); Map autoScalingJson = Utils.getJson(cc.getZkController().getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true); if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) { - positions= Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), numberOfNodes, + replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), numberOfNodes, policyName, cc.getZkController().getZkStateReader(), createNodeList); } - if(positions != null){ + if(replicaPositions != null){ List<ReplicaCount> repCounts = new ArrayList<>(); - for (String s : positions.values()) { - repCounts.add(new ReplicaCount(s)); + for (ReplicaPosition p : replicaPositions) { + repCounts.add(new ReplicaCount(p.node)); } return repCounts; } @@ -215,9 +311,10 @@ public class Assign { return sortedNodeList; } - public static Map<ReplicaAssigner.Position, String> getPositionsUsingPolicy(String collName, List<String> shardNames, int numReplicas, - String policyName, ZkStateReader zkStateReader, - List<String> nodesList) throws KeeperException, InterruptedException { + + public static List<ReplicaPosition> getPositionsUsingPolicy(String collName, List<String> shardNames, int numReplicas, + String policyName, ZkStateReader zkStateReader, + List<String> nodesList) throws KeeperException, InterruptedException { try (CloudSolrClient csc = new CloudSolrClient.Builder() .withClusterStateProvider(new ZkClientClusterStateProvider(zkStateReader)) .build()) { @@ -226,11 +323,11 @@ public class Assign { Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(collName, autoScalingJson, clientDataProvider, singletonMap(collName, policyName), shardNames, numReplicas, nodesList); - Map<ReplicaAssigner.Position, String> result = new HashMap<>(); + List<ReplicaPosition> result = new ArrayList<>(); for (Map.Entry<String, List<String>> e : locations.entrySet()) { List<String> value = e.getValue(); for (int i = 0; i < value.size(); i++) { - result.put(new ReplicaAssigner.Position(e.getKey(), i, Replica.Type.NRT), value.get(i)); + result.add(new ReplicaPosition(e.getKey(), i, Replica.Type.NRT, value.get(i))); } } return result; @@ -239,8 +336,8 @@ public class Assign { } } - private static Map<ReplicaAssigner.Position, String> getNodesViaRules(ClusterState clusterState, String shard, int numberOfNodes, - CoreContainer cc, DocCollection coll, List<String> createNodeList, List l) { + private static List<ReplicaPosition> getNodesViaRules(ClusterState clusterState, String shard, int numberOfNodes, + CoreContainer cc, DocCollection coll, List<String> createNodeList, List l) { ArrayList<Rule> rules = new ArrayList<>(); for (Object o : l) rules.add(new Rule((Map) o)); Map<String, Map<String, Integer>> shardVsNodes = new LinkedHashMap<>(); @@ -253,18 +350,18 @@ public class Assign { n.put(replica.getNodeName(), ++count); } } - List snitches = (List) coll.get(DocCollection.SNITCH); + List snitches = (List) coll.get(SNITCH); List<String> nodesList = createNodeList == null ? new ArrayList<>(clusterState.getLiveNodes()) : createNodeList; - Map<ReplicaAssigner.Position, String> positions = new ReplicaAssigner( + Map<ReplicaPosition, String> positions = new ReplicaAssigner( rules, Collections.singletonMap(shard, numberOfNodes), snitches, shardVsNodes, nodesList, cc, clusterState).getNodeMappings(); - return positions;// getReplicaCounts(positions); + return positions.entrySet().stream().map(e -> e.getKey().setNode(e.getValue())).collect(Collectors.toList());// getReplicaCounts(positions); } private static HashMap<String, ReplicaCount> getNodeNameVsShardCount(String collectionName, http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/196d84b9/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java index 0c87658..7d3df70 100644 --- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java @@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd; import org.apache.solr.cloud.overseer.ClusterStateMutator; -import org.apache.solr.cloud.rule.ReplicaAssigner; +import org.apache.solr.common.cloud.ReplicaPosition; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ClusterState; @@ -131,12 +131,12 @@ public class CreateCollectionCmd implements Cmd { // add our new cores to existing nodes serving the least number of cores // but (for now) require that each core goes on a distinct node. - final List<String> nodeList = OverseerCollectionMessageHandler.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM); - Map<ReplicaAssigner.Position, String> positionVsNodes; + final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM); + List<ReplicaPosition> replicaPositions; if (nodeList.isEmpty()) { log.warn("It is unusual to create a collection ("+collectionName+") without cores."); - positionVsNodes = new HashMap<>(); + replicaPositions = new ArrayList<>(); } else { int totalNumReplicas = numNrtReplicas + numTlogReplicas + numPullReplicas; if (totalNumReplicas > nodeList.size()) { @@ -164,7 +164,9 @@ public class CreateCollectionCmd implements Cmd { + " shards to be created (higher than the allowed number)"); } - positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas); + replicaPositions = Assign.identifyNodes(() -> ocmh.overseer.getZkController().getCoreContainer(), + ocmh.zkStateReader + , clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas); } ZkStateReader zkStateReader = ocmh.zkStateReader; @@ -207,12 +209,11 @@ public class CreateCollectionCmd implements Cmd { log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , nrtReplicas : {2}, tlogReplicas: {3}, pullReplicas: {4}", collectionName, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas)); Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>(); - for (Map.Entry<ReplicaAssigner.Position, String> e : positionVsNodes.entrySet()) { - ReplicaAssigner.Position position = e.getKey(); - String nodeName = e.getValue(); - String coreName = Assign.buildCoreName(collectionName, position.shard, position.type, position.index + 1); + for (ReplicaPosition replicaPosition : replicaPositions) { + String nodeName = replicaPosition.node; + String coreName = Assign.buildCoreName(collectionName, replicaPosition.shard, replicaPosition.type, replicaPosition.index + 1); log.debug(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}" - , coreName, position.shard, collectionName, nodeName)); + , coreName, replicaPosition.shard, collectionName, nodeName)); String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName); @@ -222,11 +223,11 @@ public class CreateCollectionCmd implements Cmd { ZkNodeProps props = new ZkNodeProps( Overseer.QUEUE_OPERATION, ADDREPLICA.toString(), ZkStateReader.COLLECTION_PROP, collectionName, - ZkStateReader.SHARD_ID_PROP, position.shard, + ZkStateReader.SHARD_ID_PROP, replicaPosition.shard, ZkStateReader.CORE_NAME_PROP, coreName, ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), - ZkStateReader.BASE_URL_PROP, baseUrl, - ZkStateReader.REPLICA_TYPE, position.type.name()); + ZkStateReader.BASE_URL_PROP, baseUrl, + ZkStateReader.REPLICA_TYPE, replicaPosition.type.name()); Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props)); } @@ -237,10 +238,10 @@ public class CreateCollectionCmd implements Cmd { params.set(CoreAdminParams.NAME, coreName); params.set(COLL_CONF, configName); params.set(CoreAdminParams.COLLECTION, collectionName); - params.set(CoreAdminParams.SHARD, position.shard); + params.set(CoreAdminParams.SHARD, replicaPosition.shard); params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); params.set(CoreAdminParams.NEW_COLLECTION, "true"); - params.set(CoreAdminParams.REPLICA_TYPE, position.type.name()); + params.set(CoreAdminParams.REPLICA_TYPE, replicaPosition.type.name()); if (async != null) { String coreAdminAsyncId = async + Math.abs(System.nanoTime()); http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/196d84b9/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java index 6a18bff..fa493c7 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java @@ -28,12 +28,13 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; import org.apache.solr.cloud.overseer.OverseerAction; -import org.apache.solr.cloud.rule.ReplicaAssigner; +import org.apache.solr.common.cloud.ReplicaPosition; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ClusterState; @@ -108,7 +109,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd { DocCollection backupCollectionState = backupMgr.readCollectionState(location, backupName, backupCollection); // Get the Solr nodes to restore a collection. - final List<String> nodeList = OverseerCollectionMessageHandler.getLiveOrLiveAndCreateNodeSetList( + final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList( zkStateReader.getClusterState().getLiveNodes(), message, RANDOM); int numShards = backupCollectionState.getActiveSlices().size(); @@ -213,8 +214,11 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd { List<String> sliceNames = new ArrayList<>(); restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName())); - Map<ReplicaAssigner.Position, String> positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, - restoreCollectionName, message, sliceNames, numNrtReplicas, numTlogReplicas, numPullReplicas); + List<ReplicaPosition> replicaPositions = Assign.identifyNodes(() -> ocmh.overseer.getZkController().getCoreContainer(), + ocmh.zkStateReader, clusterState, + nodeList, restoreCollectionName, + message, sliceNames, + numNrtReplicas, numTlogReplicas, numPullReplicas); //Create one replica per shard and copy backed up data to it for (Slice slice : restoreCollection.getSlices()) { @@ -235,12 +239,11 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd { // Get the first node matching the shard to restore in String node; - for (Map.Entry<ReplicaAssigner.Position, String> pvn : positionVsNodes.entrySet()) { - ReplicaAssigner.Position position = pvn.getKey(); - if (position.shard == slice.getName()) { - node = pvn.getValue(); + for (ReplicaPosition replicaPosition : replicaPositions) { + if (Objects.equals(replicaPosition.shard, slice.getName())) { + node = replicaPosition.node; propMap.put(CoreAdminParams.NODE, node); - positionVsNodes.remove(position); + replicaPositions.remove(replicaPosition); break; } } @@ -319,12 +322,11 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd { // Get the first node matching the shard to restore in String node; - for (Map.Entry<ReplicaAssigner.Position, String> pvn : positionVsNodes.entrySet()) { - ReplicaAssigner.Position position = pvn.getKey(); - if (position.shard == slice.getName()) { - node = pvn.getValue(); + for (ReplicaPosition replicaPosition : replicaPositions) { + if (Objects.equals(replicaPosition.shard, slice.getName())) { + node = replicaPosition.node; propMap.put(CoreAdminParams.NODE, node); - positionVsNodes.remove(position); + replicaPositions.remove(replicaPosition); break; } } http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/196d84b9/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java index 2e2e335..099190c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java @@ -30,7 +30,7 @@ import java.util.Set; import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd; import org.apache.solr.cloud.overseer.OverseerAction; -import org.apache.solr.cloud.rule.ReplicaAssigner; +import org.apache.solr.common.cloud.ReplicaPosition; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.CompositeIdRouter; @@ -381,7 +381,8 @@ public class SplitShardCmd implements Cmd { // TODO: change this to handle sharding a slice into > 2 sub-shards. - Map<ReplicaAssigner.Position, String> nodeMap = ocmh.identifyNodes(clusterState, + List<ReplicaPosition> replicaPositions = Assign.identifyNodes(() -> ocmh.overseer.getZkController().getCoreContainer(), + ocmh.zkStateReader, clusterState, new ArrayList<>(clusterState.getLiveNodes()), collectionName, new ZkNodeProps(collection.getProperties()), @@ -389,10 +390,10 @@ public class SplitShardCmd implements Cmd { List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2); - for (Map.Entry<ReplicaAssigner.Position, String> entry : nodeMap.entrySet()) { - String sliceName = entry.getKey().shard; - String subShardNodeName = entry.getValue(); - String shardName = collectionName + "_" + sliceName + "_replica" + (entry.getKey().index); + for (ReplicaPosition replicaPosition : replicaPositions) { + String sliceName = replicaPosition.shard; + String subShardNodeName = replicaPosition.node; + String shardName = collectionName + "_" + sliceName + "_replica" + (replicaPosition.index); log.info("Creating replica shard " + shardName + " as part of slice " + sliceName + " of collection " + collectionName + " on " + subShardNodeName); http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/196d84b9/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java index 669e82b..8887e53 100644 --- a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java +++ b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.ReplicaPosition; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.rule.ImplicitSnitch; @@ -59,31 +60,6 @@ public class ReplicaAssigner { private Map<String, AtomicInteger> nodeVsCores = new HashMap<>(); - public static class Position implements Comparable<Position> { - public final String shard; - public final int index; - public final Replica.Type type; - - public Position(String shard, int replicaIdx, Replica.Type type) { - this.shard = shard; - this.index = replicaIdx; - this.type = type; - } - - @Override - public int compareTo(Position that) { - //this is to ensure that we try one replica from each shard first instead of - // all replicas from same shard - return that.index > index ? -1 : that.index == index ? 0 : 1; - } - - @Override - public String toString() { - return shard + ":" + index; - } - } - - /** * @param shardVsReplicaCount shard names vs no:of replicas required for each of those shards * @param snitches snitches details @@ -128,8 +104,8 @@ public class ReplicaAssigner { * For each shard return a new set of nodes where the replicas need to be created satisfying * the specified rule */ - public Map<Position, String> getNodeMappings() { - Map<Position, String> result = getNodeMappings0(); + public Map<ReplicaPosition, String> getNodeMappings() { + Map<ReplicaPosition, String> result = getNodeMappings0(); if (result == null) { String msg = "Could not identify nodes matching the rules " + rules; if (!failedNodes.isEmpty()) { @@ -149,7 +125,7 @@ public class ReplicaAssigner { } - Map<Position, String> getNodeMappings0() { + Map<ReplicaPosition, String> getNodeMappings0() { List<String> shardNames = new ArrayList<>(shardVsReplicaCount.keySet()); int[] shardOrder = new int[shardNames.size()]; for (int i = 0; i < shardNames.size(); i++) shardOrder[i] = i; @@ -168,17 +144,17 @@ public class ReplicaAssigner { } } - Map<Position, String> result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, false); + Map<ReplicaPosition, String> result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, false); if (result == null && hasFuzzyRules) { result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, true); } return result; } - private Map<Position, String> tryAllPermutations(List<String> shardNames, - int[] shardOrder, - int nonWildCardShardRules, - boolean fuzzyPhase) { + private Map<ReplicaPosition, String> tryAllPermutations(List<String> shardNames, + int[] shardOrder, + int nonWildCardShardRules, + boolean fuzzyPhase) { Iterator<int[]> shardPermutations = nonWildCardShardRules > 0 ? @@ -187,16 +163,16 @@ public class ReplicaAssigner { for (; shardPermutations.hasNext(); ) { int[] p = shardPermutations.next(); - List<Position> positions = new ArrayList<>(); + List<ReplicaPosition> replicaPositions = new ArrayList<>(); for (int pos : p) { for (int j = 0; j < shardVsReplicaCount.get(shardNames.get(pos)); j++) { - positions.add(new Position(shardNames.get(pos), j, Replica.Type.NRT)); + replicaPositions.add(new ReplicaPosition(shardNames.get(pos), j, Replica.Type.NRT)); } } - Collections.sort(positions); + Collections.sort(replicaPositions); for (Iterator<int[]> it = permutations(rules.size()); it.hasNext(); ) { int[] permutation = it.next(); - Map<Position, String> result = tryAPermutationOfRules(permutation, positions, fuzzyPhase); + Map<ReplicaPosition, String> result = tryAPermutationOfRules(permutation, replicaPositions, fuzzyPhase); if (result != null) return result; } } @@ -205,9 +181,9 @@ public class ReplicaAssigner { } - private Map<Position, String> tryAPermutationOfRules(int[] rulePermutation, List<Position> positions, boolean fuzzyPhase) { + private Map<ReplicaPosition, String> tryAPermutationOfRules(int[] rulePermutation, List<ReplicaPosition> replicaPositions, boolean fuzzyPhase) { Map<String, Map<String, Object>> nodeVsTagsCopy = getDeepCopy(nodeVsTags, 2); - Map<Position, String> result = new LinkedHashMap<>(); + Map<ReplicaPosition, String> result = new LinkedHashMap<>(); int startPosition = 0; Map<String, Map<String, Integer>> copyOfCurrentState = getDeepCopy(shardVsNodes, 2); List<String> sortedLiveNodes = new ArrayList<>(this.participatingLiveNodes); @@ -232,7 +208,7 @@ public class ReplicaAssigner { return result1; }); forEachPosition: - for (Position position : positions) { + for (ReplicaPosition replicaPosition : replicaPositions) { //trying to assign a node by verifying each rule in this rulePermutation forEachNode: for (int j = 0; j < sortedLiveNodes.size(); j++) { @@ -242,16 +218,16 @@ public class ReplicaAssigner { Rule rule = rules.get(rulePermutation[i]); //trying to assign a replica into this node in this shard Rule.MatchStatus status = rule.tryAssignNodeToShard(liveNode, - copyOfCurrentState, nodeVsTagsCopy, position.shard, fuzzyPhase ? FUZZY_ASSIGN : ASSIGN); + copyOfCurrentState, nodeVsTagsCopy, replicaPosition.shard, fuzzyPhase ? FUZZY_ASSIGN : ASSIGN); if (status == Rule.MatchStatus.CANNOT_ASSIGN_FAIL) { continue forEachNode;//try another node for this position } } //We have reached this far means this node can be applied to this position //and all rules are fine. So let us change the currentState - result.put(position, liveNode); - Map<String, Integer> nodeNames = copyOfCurrentState.get(position.shard); - if (nodeNames == null) copyOfCurrentState.put(position.shard, nodeNames = new HashMap<>()); + result.put(replicaPosition, liveNode); + Map<String, Integer> nodeNames = copyOfCurrentState.get(replicaPosition.shard); + if (nodeNames == null) copyOfCurrentState.put(replicaPosition.shard, nodeNames = new HashMap<>()); Integer n = nodeNames.get(liveNode); n = n == null ? 1 : n + 1; nodeNames.put(liveNode, n); @@ -267,11 +243,11 @@ public class ReplicaAssigner { return null; } - if (positions.size() > result.size()) { + if (replicaPositions.size() > result.size()) { return null; } - for (Map.Entry<Position, String> e : result.entrySet()) { + for (Map.Entry<ReplicaPosition, String> e : result.entrySet()) { for (int i = 0; i < rulePermutation.length; i++) { Rule rule = rules.get(rulePermutation[i]); Rule.MatchStatus matchStatus = rule.tryAssignNodeToShard(e.getValue(), http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/196d84b9/solr/core/src/java/org/apache/solr/common/cloud/ReplicaPosition.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/common/cloud/ReplicaPosition.java b/solr/core/src/java/org/apache/solr/common/cloud/ReplicaPosition.java new file mode 100644 index 0000000..d64d1d1 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/common/cloud/ReplicaPosition.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.common.cloud; + + +public class ReplicaPosition implements Comparable<ReplicaPosition> { + public final String shard; + public final int index; + public final Replica.Type type; + public String node; + + public ReplicaPosition(String shard, int replicaIdx, Replica.Type type) { + this.shard = shard; + this.index = replicaIdx; + this.type = type; + } + public ReplicaPosition(String shard, int replicaIdx, Replica.Type type, String node) { + this.shard = shard; + this.index = replicaIdx; + this.type = type; + this.node = node; + } + + @Override + public int compareTo(ReplicaPosition that) { + //this is to ensure that we try one replica from each shard first instead of + // all replicas from same shard + return that.index > index ? -1 : that.index == index ? 0 : 1; + } + + @Override + public String toString() { + return shard + ":" + index; + } + + public ReplicaPosition setNode(String node) { + this.node = node; + return this; + } +} http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/196d84b9/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java ---------------------------------------------------------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java index 8b0a788..6d460ed 100644 --- a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java @@ -28,7 +28,7 @@ import java.util.Set; import com.google.common.collect.ImmutableList; import org.apache.solr.SolrTestCaseJ4; -import org.apache.solr.cloud.rule.ReplicaAssigner.Position; +import org.apache.solr.common.cloud.ReplicaPosition; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.rule.Snitch; import org.apache.solr.common.cloud.rule.SnitchContext; @@ -73,7 +73,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{ "'replica':'1',shard:'*','node':'*'}," + " {'freedisk':'>1'}]"); - Map<Position, String> mapping = new ReplicaAssigner( + Map<ReplicaPosition, String> mapping = new ReplicaAssigner( rules, shardVsReplicaCount, singletonList(MockSnitch.class.getName()), new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null ).getNodeMappings(); @@ -147,7 +147,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{ "{node:'!127.0.0.1:49947_'}," + "{freedisk:'>1'}]"); Map shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2); - Map<Position, String> mapping = new ReplicaAssigner( + Map<ReplicaPosition, String> mapping = new ReplicaAssigner( rules, shardVsReplicaCount, singletonList(MockSnitch.class.getName()), new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings(); @@ -236,7 +236,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{ "node5:80", makeMap("rack", "182") ); MockSnitch.nodeVsTags = nodeVsTags; - Map<Position, String> mapping = new ReplicaAssigner( + Map<ReplicaPosition, String> mapping = new ReplicaAssigner( rules, shardVsReplicaCount, singletonList(MockSnitch.class.getName()), new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings0();