This is an automated email from the ASF dual-hosted git repository.
dsmiley pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 24fe19177bc SOLR-17535: Migrate ClusterState.getCollectionsMap callers
to Stream (#2846)
24fe19177bc is described below
commit 24fe19177bc0ffe29d4ddcf8857ac83f57e0f58b
Author: David Smiley <[email protected]>
AuthorDate: Wed Nov 13 17:28:22 2024 -0500
SOLR-17535: Migrate ClusterState.getCollectionsMap callers to Stream (#2846)
Deprecated getCollectionsMap.
A refactoring, albeit some additional points:
* DeleteCollection: avoid NPE for non-existent collection (trivial change)
* ClusterStateUtil (test-framework): optimize waiting to use
ZkStateReader.waitForState for single-collection
---
.../java/org/apache/solr/cloud/ConfigSetCmds.java | 23 +-
.../solr/cloud/DistributedClusterStateUpdater.java | 2 +-
.../cloud/api/collections/DeleteCollectionCmd.java | 16 +-
.../api/collections/ReplicaMigrationUtils.java | 36 +--
.../apache/solr/cloud/overseer/NodeMutator.java | 29 +--
.../cluster/maintenance/InactiveShardRemover.java | 3 +-
.../impl/SimpleClusterAbstractionsImpl.java | 3 +-
.../apache/solr/handler/admin/ClusterStatus.java | 121 +++++----
.../solr/handler/admin/api/ListCollections.java | 17 +-
.../java/org/apache/solr/servlet/HttpSolrCall.java | 43 +--
.../apache/solr/cloud/CollectionsAPISolrJTest.java | 9 +-
.../test/org/apache/solr/cloud/OverseerTest.java | 18 +-
.../SharedFileSystemAutoReplicaFailoverTest.java | 52 +---
.../solr/prometheus/scraper/SolrCloudScraper.java | 10 +-
.../prometheus/scraper/SolrCloudScraperTest.java | 10 +-
.../org/apache/solr/common/cloud/ClusterState.java | 6 +-
.../solr/cloud/AbstractFullDistribZkTestBase.java | 8 +-
.../apache/solr/common/cloud/ClusterStateUtil.java | 289 ++++++++-------------
18 files changed, 291 insertions(+), 404 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ConfigSetCmds.java
b/solr/core/src/java/org/apache/solr/cloud/ConfigSetCmds.java
index ee1c01f99d4..52219bbc2ea 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ConfigSetCmds.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ConfigSetCmds.java
@@ -31,7 +31,6 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ConfigSetParams;
@@ -184,16 +183,18 @@ public class ConfigSetCmds {
String configSetName, boolean force, CoreContainer coreContainer) throws
IOException {
ZkStateReader zkStateReader =
coreContainer.getZkController().getZkStateReader();
- for (Map.Entry<String, DocCollection> entry :
- zkStateReader.getClusterState().getCollectionsMap().entrySet()) {
- String configName = entry.getValue().getConfigName();
- if (configSetName.equals(configName))
- throw new SolrException(
- SolrException.ErrorCode.BAD_REQUEST,
- "Can not delete ConfigSet as it is currently being used by
collection ["
- + entry.getKey()
- + "]");
- }
+ zkStateReader
+ .getClusterState()
+ .forEachCollection(
+ state -> {
+ String configName = state.getConfigName();
+ if (configSetName.equals(configName))
+ throw new SolrException(
+ SolrException.ErrorCode.BAD_REQUEST,
+ "Can not delete ConfigSet as it is currently being used by
collection ["
+ + state.getName()
+ + "]");
+ });
String propertyPath = ConfigSetProperties.DEFAULT_FILENAME;
NamedList<Object> properties =
diff --git
a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
index 7d47dd26bcb..3c7552790c9 100644
---
a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
+++
b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
@@ -975,7 +975,7 @@ public class DistributedClusterStateUpdater {
final DocCollection docCollection =
clusterState.getCollectionOrNull(collectionName);
Optional<ZkWriteCommand> result =
docCollection != null
- ? NodeMutator.computeCollectionUpdate(nodeName, collectionName,
docCollection, client)
+ ? NodeMutator.computeCollectionUpdate(nodeName, docCollection,
client)
: Optional.empty();
if (docCollection == null) {
diff --git
a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index a1e9012aaf0..1d8629c4fdc 100644
---
a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++
b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -168,21 +168,19 @@ public class DeleteCollectionCmd implements
CollApiCmds.CollectionApiCommand {
}
// delete related config set iff: it is auto generated AND not related
to any other collection
- String configSetName = coll.getConfigName();
+ String configSetName = coll == null ? null : coll.getConfigName();
if (ConfigSetsHandler.isAutoGeneratedConfigSet(configSetName)) {
boolean configSetIsUsedByOtherCollection = false;
// make sure the configSet is not shared with other collections
// Similar to what happens in: ConfigSetCmds::deleteConfigSet
- for (Map.Entry<String, DocCollection> entry :
- zkStateReader.getClusterState().getCollectionsMap().entrySet()) {
- String otherConfigSetName = entry.getValue().getConfigName();
- if (configSetName.equals(otherConfigSetName)) {
- configSetIsUsedByOtherCollection = true;
- break;
- }
- }
+ configSetIsUsedByOtherCollection =
+ zkStateReader
+ .getClusterState()
+ .collectionStream()
+ .map(DocCollection::getConfigName)
+ .anyMatch(configSetName::equals);
if (!configSetIsUsedByOtherCollection) {
// delete the config set
diff --git
a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java
b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java
index 6013dea0c2d..d21aa3a5f95 100644
---
a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java
+++
b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java
@@ -34,9 +34,7 @@ import org.apache.solr.cloud.ActiveReplicaWatcher;
import org.apache.solr.common.SolrCloseableLatch;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStateWatcher;
-import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
@@ -305,30 +303,20 @@ public class ReplicaMigrationUtils {
}
static List<Replica> getReplicasOfNodes(Collection<String> nodeNames,
ClusterState state) {
- List<Replica> sourceReplicas = new ArrayList<>();
- for (Map.Entry<String, DocCollection> e :
state.getCollectionsMap().entrySet()) {
- for (Slice slice : e.getValue().getSlices()) {
- for (Replica replica : slice.getReplicas()) {
- if (nodeNames.contains(replica.getNodeName())) {
- sourceReplicas.add(replica);
- }
- }
- }
- }
- return sourceReplicas;
+ return state
+ .collectionStream()
+ .flatMap(dc -> dc.getSlices().stream())
+ .flatMap(s -> s.getReplicas().stream())
+ .filter(r -> nodeNames.contains(r.getNodeName()))
+ .toList();
}
static List<Replica> getReplicasOfNode(String nodeName, ClusterState state) {
- List<Replica> sourceReplicas = new ArrayList<>();
- for (Map.Entry<String, DocCollection> e :
state.getCollectionsMap().entrySet()) {
- for (Slice slice : e.getValue().getSlices()) {
- for (Replica replica : slice.getReplicas()) {
- if (nodeName.equals(replica.getNodeName())) {
- sourceReplicas.add(replica);
- }
- }
- }
- }
- return sourceReplicas;
+ return state
+ .collectionStream()
+ .flatMap(dc -> dc.getSlices().stream())
+ .flatMap(s -> s.getReplicas().stream())
+ .filter(r -> nodeName.equals(r.getNodeName()))
+ .toList();
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index f54d2569ef5..48ce2e9fde2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -49,23 +49,12 @@ public class NodeMutator {
log.debug("DownNode state invoked for node: {}", nodeName);
- List<ZkWriteCommand> zkWriteCommands = new ArrayList<>();
-
- Map<String, DocCollection> collections = clusterState.getCollectionsMap();
- for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
- String collectionName = entry.getKey();
- DocCollection docCollection = entry.getValue();
- if (docCollection.isPerReplicaState()) continue;
-
- Optional<ZkWriteCommand> zkWriteCommand =
- computeCollectionUpdate(nodeName, collectionName, docCollection,
zkClient);
-
- if (zkWriteCommand.isPresent()) {
- zkWriteCommands.add(zkWriteCommand.get());
- }
- }
-
- return zkWriteCommands;
+ return clusterState
+ .collectionStream()
+ .filter(entry -> !entry.isPerReplicaState())
+ .map(docCollection -> computeCollectionUpdate(nodeName, docCollection,
zkClient))
+ .flatMap(Optional::stream)
+ .toList();
}
/**
@@ -77,7 +66,7 @@ public class NodeMutator {
* for an update to state.json, depending on the configuration of the
collection.
*/
public static Optional<ZkWriteCommand> computeCollectionUpdate(
- String nodeName, String collectionName, DocCollection docCollection,
SolrZkClient client) {
+ String nodeName, DocCollection docCollection, SolrZkClient client) {
boolean needToUpdateCollection = false;
List<String> downedReplicas = new ArrayList<>();
final Map<String, Slice> slicesCopy = new
LinkedHashMap<>(docCollection.getSlicesMap());
@@ -107,13 +96,13 @@ public class NodeMutator {
return Optional.of(
new ZkWriteCommand(
- collectionName,
+ docCollection.getName(),
docCollection.copyWithSlices(slicesCopy),
PerReplicaStatesOps.downReplicas(downedReplicas, prs),
false));
} else {
return Optional.of(
- new ZkWriteCommand(collectionName,
docCollection.copyWithSlices(slicesCopy)));
+ new ZkWriteCommand(docCollection.getName(),
docCollection.copyWithSlices(slicesCopy)));
}
} else {
// No update needed for this collection
diff --git
a/solr/core/src/java/org/apache/solr/cluster/maintenance/InactiveShardRemover.java
b/solr/core/src/java/org/apache/solr/cluster/maintenance/InactiveShardRemover.java
index 177663d1140..1d951492afd 100644
---
a/solr/core/src/java/org/apache/solr/cluster/maintenance/InactiveShardRemover.java
+++
b/solr/core/src/java/org/apache/solr/cluster/maintenance/InactiveShardRemover.java
@@ -143,7 +143,8 @@ public class InactiveShardRemover
void deleteInactiveSlices() {
final ClusterState clusterState =
coreContainer.getZkController().getClusterState();
Collection<Slice> inactiveSlices =
- clusterState.getCollectionsMap().values().stream()
+ clusterState
+ .collectionStream()
.flatMap(v -> collectInactiveSlices(v).stream())
.collect(Collectors.toSet());
diff --git
a/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java
b/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java
index cb86dc304e5..ece890b9b1f 100644
---
a/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java
+++
b/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java
@@ -93,7 +93,8 @@ class SimpleClusterAbstractionsImpl {
@Override
public Iterator<SolrCollection> iterator() {
- return clusterState.getCollectionsMap().values().stream()
+ return clusterState
+ .collectionStream()
.map(SolrCollectionImpl::fromDocCollection)
.collect(Collectors.toSet())
.iterator();
diff --git
a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
index 6c5998d17a2..18c8843f916 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
@@ -24,8 +24,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
@@ -43,6 +44,7 @@ import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
public class ClusterStatus {
+
private final ZkStateReader zkStateReader;
private final SolrParams solrParams;
private final String collection; // maybe null
@@ -178,79 +180,74 @@ public class ClusterStatus {
String routeKey = solrParams.get(ShardParams._ROUTE_);
String shard = solrParams.get(ZkStateReader.SHARD_ID_PROP);
- Map<String, DocCollection> collectionsMap = null;
+ Stream<DocCollection> collectionStream;
if (collection == null) {
- collectionsMap = clusterState.getCollectionsMap();
+ collectionStream = clusterState.collectionStream();
} else {
- collectionsMap =
- Collections.singletonMap(collection,
clusterState.getCollectionOrNull(collection));
- }
-
- boolean isAlias = aliasVsCollections.containsKey(collection);
- boolean didNotFindCollection = collectionsMap.get(collection) == null;
-
- if (didNotFindCollection && isAlias) {
- // In this case this.collection is an alias name not a collection
- // get all collections and filter out collections not in the alias
- // clusterState.getCollectionsMap() should be replaced with an
inexpensive call
- collectionsMap =
- clusterState.getCollectionsMap().entrySet().stream()
- .filter((entry) ->
aliasVsCollections.get(collection).contains(entry.getKey()))
- .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
- }
-
- NamedList<Object> collectionProps = new SimpleOrderedMap<>();
-
- for (Map.Entry<String, DocCollection> entry : collectionsMap.entrySet()) {
- Map<String, Object> collectionStatus;
- String name = entry.getKey();
- DocCollection clusterStateCollection = entry.getValue();
- if (clusterStateCollection == null) {
- if (collection != null) {
+ DocCollection collState = clusterState.getCollectionOrNull(collection);
+ if (collState != null) {
+ collectionStream = Stream.of(collState);
+ } else { // couldn't find collection
+ // hopefully an alias...
+ if (!aliasVsCollections.containsKey(collection)) { // not an alias
either
SolrException solrException =
new SolrException(
- SolrException.ErrorCode.BAD_REQUEST, "Collection: " + name +
" not found");
+ SolrException.ErrorCode.BAD_REQUEST, "Collection: " +
collection + " not found");
solrException.setMetadata("CLUSTERSTATUS", "NOT_FOUND");
throw solrException;
- } else {
- // collection might have got deleted at the same time
- continue;
}
+ // In this case this.collection is an alias name not a collection
+ // Resolve them (not recursively but maybe should?).
+ collectionStream =
+ aliasVsCollections.get(collection).stream()
+ .map(clusterState::getCollectionOrNull)
+ .filter(Objects::nonNull);
}
+ }
- Set<String> requestedShards = new HashSet<>();
- if (routeKey != null) {
- DocRouter router = clusterStateCollection.getRouter();
- Collection<Slice> slices = router.getSearchSlices(routeKey, null,
clusterStateCollection);
- for (Slice slice : slices) {
- requestedShards.add(slice.getName());
- }
- }
- if (shard != null) {
- String[] paramShards = shard.split(",");
- requestedShards.addAll(Arrays.asList(paramShards));
- }
+ // TODO use an Iterable to stream the data to the client instead of
gathering it all in mem
- byte[] bytes = Utils.toJSON(clusterStateCollection);
- @SuppressWarnings("unchecked")
- Map<String, Object> docCollection = (Map<String, Object>)
Utils.fromJSON(bytes);
- collectionStatus = getCollectionStatus(docCollection, name,
requestedShards);
+ NamedList<Object> collectionProps = new SimpleOrderedMap<>();
- collectionStatus.put("znodeVersion",
clusterStateCollection.getZNodeVersion());
- collectionStatus.put(
- "creationTimeMillis",
clusterStateCollection.getCreationTime().toEpochMilli());
+ collectionStream.forEach(
+ clusterStateCollection -> {
+ Map<String, Object> collectionStatus;
+ String name = clusterStateCollection.getName();
+
+ Set<String> requestedShards = new HashSet<>();
+ if (routeKey != null) {
+ DocRouter router = clusterStateCollection.getRouter();
+ Collection<Slice> slices =
+ router.getSearchSlices(routeKey, null, clusterStateCollection);
+ for (Slice slice : slices) {
+ requestedShards.add(slice.getName());
+ }
+ }
+ if (shard != null) {
+ String[] paramShards = shard.split(",");
+ requestedShards.addAll(Arrays.asList(paramShards));
+ }
- if (collectionVsAliases.containsKey(name) &&
!collectionVsAliases.get(name).isEmpty()) {
- collectionStatus.put("aliases", collectionVsAliases.get(name));
- }
- String configName = clusterStateCollection.getConfigName();
- collectionStatus.put("configName", configName);
- if (solrParams.getBool("prs", false) &&
clusterStateCollection.isPerReplicaState()) {
- PerReplicaStates prs = clusterStateCollection.getPerReplicaStates();
- collectionStatus.put("PRS", prs);
- }
- collectionProps.add(name, collectionStatus);
- }
+ byte[] bytes = Utils.toJSON(clusterStateCollection);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> docCollection = (Map<String, Object>)
Utils.fromJSON(bytes);
+ collectionStatus = getCollectionStatus(docCollection, name,
requestedShards);
+
+ collectionStatus.put("znodeVersion",
clusterStateCollection.getZNodeVersion());
+ collectionStatus.put(
+ "creationTimeMillis",
clusterStateCollection.getCreationTime().toEpochMilli());
+
+ if (collectionVsAliases.containsKey(name) &&
!collectionVsAliases.get(name).isEmpty()) {
+ collectionStatus.put("aliases", collectionVsAliases.get(name));
+ }
+ String configName = clusterStateCollection.getConfigName();
+ collectionStatus.put("configName", configName);
+ if (solrParams.getBool("prs", false) &&
clusterStateCollection.isPerReplicaState()) {
+ PerReplicaStates prs =
clusterStateCollection.getPerReplicaStates();
+ collectionStatus.put("PRS", prs);
+ }
+ collectionProps.add(name, collectionStatus);
+ });
// now we need to walk the collectionProps tree to cross-check replica
state with live nodes
crossCheckReplicaStateWithLiveNodes(liveNodes, collectionProps);
diff --git
a/solr/core/src/java/org/apache/solr/handler/admin/api/ListCollections.java
b/solr/core/src/java/org/apache/solr/handler/admin/api/ListCollections.java
index 4b54b86ae81..81c5fac32c5 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/api/ListCollections.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/api/ListCollections.java
@@ -20,10 +20,7 @@ package org.apache.solr.handler.admin.api;
import static
org.apache.solr.security.PermissionNameProvider.Name.COLL_READ_PERM;
import jakarta.inject.Inject;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
import org.apache.solr.client.api.endpoint.ListCollectionsApi;
import org.apache.solr.client.api.model.ListCollectionsResponse;
import org.apache.solr.common.cloud.DocCollection;
@@ -51,10 +48,16 @@ public class ListCollections extends AdminAPIBase
implements ListCollectionsApi
instantiateJerseyResponse(ListCollectionsResponse.class);
validateZooKeeperAwareCoreContainer(coreContainer);
- Map<String, DocCollection> collections =
-
coreContainer.getZkController().getZkStateReader().getClusterState().getCollectionsMap();
- List<String> collectionList = new ArrayList<>(collections.keySet());
- Collections.sort(collectionList);
+ // resolve each name to ensure it exists.
+ // TODO https://issues.apache.org/jira/browse/SOLR-16909 to go direct to
ZK?
+ List<String> collectionList =
+ coreContainer
+ .getZkController()
+ .getClusterState()
+ .collectionStream()
+ .map(DocCollection::getName)
+ .sorted()
+ .toList();
// XXX should we add aliases here?
response.collections = collectionList;
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index 99291fc6ad0..7241be77390 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -40,6 +40,7 @@ import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
@@ -1089,22 +1090,15 @@ public class HttpSolrCall {
return core;
}
- private void getSlicesForCollections(
- ClusterState clusterState, Collection<Slice> slices, boolean
activeSlices) {
+ private List<Slice> getSlicesForAllCollections(ClusterState clusterState,
boolean activeSlices) {
+ // looks across *all* collections
if (activeSlices) {
- for (Map.Entry<String, DocCollection> entry :
clusterState.getCollectionsMap().entrySet()) {
- final Slice[] activeCollectionSlices =
entry.getValue().getActiveSlicesArr();
- if (activeCollectionSlices != null) {
- Collections.addAll(slices, activeCollectionSlices);
- }
- }
+ return clusterState
+ .collectionStream()
+ .flatMap(coll -> Arrays.stream(coll.getActiveSlicesArr()))
+ .toList();
} else {
- for (Map.Entry<String, DocCollection> entry :
clusterState.getCollectionsMap().entrySet()) {
- final Collection<Slice> collectionSlices =
entry.getValue().getSlices();
- if (collectionSlices != null) {
- slices.addAll(collectionSlices);
- }
- }
+ return clusterState.collectionStream().flatMap(coll ->
coll.getSlices().stream()).toList();
}
}
@@ -1113,20 +1107,20 @@ public class HttpSolrCall {
ClusterState clusterState = cores.getZkController().getClusterState();
final DocCollection docCollection =
clusterState.getCollectionOrNull(collectionName);
Slice[] slices = (docCollection != null) ?
docCollection.getActiveSlicesArr() : null;
- List<Slice> activeSlices = new ArrayList<>();
+ List<Slice> activeSlices;
boolean byCoreName = false;
int totalReplicas = 0;
if (slices == null) {
byCoreName = true;
- activeSlices = new ArrayList<>();
- getSlicesForCollections(clusterState, activeSlices, true);
+ // all collections!
+ activeSlices = getSlicesForAllCollections(clusterState, true);
if (activeSlices.isEmpty()) {
- getSlicesForCollections(clusterState, activeSlices, false);
+ activeSlices = getSlicesForAllCollections(clusterState, false);
}
} else {
- Collections.addAll(activeSlices, slices);
+ activeSlices = List.of(slices);
}
for (Slice s : activeSlices) {
@@ -1171,9 +1165,16 @@ public class HttpSolrCall {
boolean activeReplicas) {
String coreUrl;
Set<String> liveNodes = clusterState.getLiveNodes();
- Collections.shuffle(slices, Utils.RANDOM);
- for (Slice slice : slices) {
+ List<Slice> shuffledSlices;
+ if (slices.size() < 2) {
+ shuffledSlices = slices;
+ } else {
+ shuffledSlices = new ArrayList<>(slices);
+ Collections.shuffle(shuffledSlices, Utils.RANDOM);
+ }
+
+ for (Slice slice : shuffledSlices) {
List<Replica> randomizedReplicas = new ArrayList<>(slice.getReplicas());
Collections.shuffle(randomizedReplicas, Utils.RANDOM);
diff --git
a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index cba11e7d064..cf338640579 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -37,6 +37,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
@@ -1027,7 +1028,7 @@ public class CollectionsAPISolrJTest extends
SolrCloudTestCase {
delete.setFollowAliases(false);
delete.process(solrClient);
ClusterState state = solrClient.getClusterState();
- assertFalse(state.getCollectionsMap().toString(),
state.hasCollection(collectionName1));
+ assertFalse(collectionNamesString(state),
state.hasCollection(collectionName1));
// search should still work, returning results from collection 2
assertDoc(solrClient, collectionName1, "2"); // aliased
assertDoc(solrClient, collectionName2, "2"); // direct
@@ -1048,7 +1049,7 @@ public class CollectionsAPISolrJTest extends
SolrCloudTestCase {
state = solrClient.getClusterState();
// the collection is gone
- assertFalse(state.getCollectionsMap().toString(),
state.hasCollection(collectionName2));
+ assertFalse(collectionNamesString(state),
state.hasCollection(collectionName2));
// and the alias is gone
RetryUtil.retryUntil(
@@ -1067,6 +1068,10 @@ public class CollectionsAPISolrJTest extends
SolrCloudTestCase {
});
}
+ private static String collectionNamesString(ClusterState state) {
+ return
state.collectionStream().map(Object::toString).collect(Collectors.joining(","));
+ }
+
private void assertDoc(CloudSolrClient solrClient, String collection, String
id)
throws Exception {
QueryResponse rsp = solrClient.query(collection, params(CommonParams.Q,
"*:*"));
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 8e7e45333f4..bad8d58d021 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -1412,16 +1412,14 @@ public class OverseerTest extends SolrTestCaseJ4 {
reader.forceUpdateCollection(COLLECTION);
ClusterState state = reader.getClusterState();
- int numFound = 0;
- Map<String, DocCollection> collectionsMap = state.getCollectionsMap();
- for (Map.Entry<String, DocCollection> entry : collectionsMap.entrySet())
{
- DocCollection collection = entry.getValue();
- for (Slice slice : collection.getSlices()) {
- if (slice.getReplicasMap().get("core_node1") != null) {
- numFound++;
- }
- }
- }
+ long numFound =
+ state
+ .collectionStream()
+ .map(DocCollection::getSlices)
+ .flatMap(Collection::stream)
+ .filter(slice -> slice.getReplicasMap().get("core_node1") !=
null)
+ .count();
+
assertEquals("Shard was found more than once in ClusterState", 1,
numFound);
} finally {
close(overseerClient);
diff --git
a/solr/modules/hdfs/src/test/org/apache/solr/hdfs/cloud/SharedFileSystemAutoReplicaFailoverTest.java
b/solr/modules/hdfs/src/test/org/apache/solr/hdfs/cloud/SharedFileSystemAutoReplicaFailoverTest.java
index 34e3b2a1492..68fd8715181 100644
---
a/solr/modules/hdfs/src/test/org/apache/solr/hdfs/cloud/SharedFileSystemAutoReplicaFailoverTest.java
+++
b/solr/modules/hdfs/src/test/org/apache/solr/hdfs/cloud/SharedFileSystemAutoReplicaFailoverTest.java
@@ -47,11 +47,8 @@ import
org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.ChaosMonkey;
-import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ClusterStateUtil;
-import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -382,44 +379,17 @@ public class SharedFileSystemAutoReplicaFailoverTest
extends AbstractFullDistrib
.filter(jetty -> jetty.getCoreContainer() != null)
.map(JettySolrRunner::getNodeName)
.collect(Collectors.toSet());
- long timeout =
- System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutInMs,
TimeUnit.MILLISECONDS);
- boolean success = false;
- while (!success && System.nanoTime() < timeout) {
- success = true;
- ClusterState clusterState = zkStateReader.getClusterState();
- if (clusterState != null) {
- Map<String, DocCollection> collections =
clusterState.getCollectionsMap();
- for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
- DocCollection docCollection = entry.getValue();
- Collection<Slice> slices = docCollection.getSlices();
- for (Slice slice : slices) {
- // only look at active shards
- if (slice.getState() == Slice.State.ACTIVE) {
- Collection<Replica> replicas = slice.getReplicas();
- for (Replica replica : replicas) {
- if (nodeNames.contains(replica.getNodeName())) {
- boolean live =
clusterState.liveNodesContain(replica.getNodeName());
- if (live) {
- success = false;
- }
- }
- }
- }
- }
- }
- if (!success) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Interrupted");
- }
- }
- }
- }
-
- return success;
+ return ClusterStateUtil.waitFor(
+ zkStateReader,
+ null,
+ timeoutInMs,
+ TimeUnit.MILLISECONDS,
+ (liveNodes, state) ->
+ ClusterStateUtil.replicasOfActiveSlicesStream(state)
+ .noneMatch(
+ replica ->
+ nodeNames.contains(replica.getNodeName())
+ && liveNodes.contains(replica.getNodeName())));
}
private void assertSliceAndReplicaCount(
diff --git
a/solr/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java
b/solr/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java
index e333076dbe2..79c3e46ae2c 100644
---
a/solr/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java
+++
b/solr/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java
@@ -57,10 +57,10 @@ public class SolrCloudScraper extends SolrScraper {
public Map<String, MetricSamples> pingAllCores(MetricsQuery query) throws
IOException {
Map<String, Http2SolrClient> httpSolrClients = createHttpSolrClients();
- Map<String, DocCollection> collectionState =
solrClient.getClusterState().getCollectionsMap();
-
List<Replica> replicas =
- collectionState.values().stream()
+ solrClient
+ .getClusterState()
+ .collectionStream()
.map(DocCollection::getReplicas)
.flatMap(List::stream)
.collect(Collectors.toList());
@@ -131,7 +131,9 @@ public class SolrCloudScraper extends SolrScraper {
}
private Set<String> getBaseUrls() throws IOException {
- return solrClient.getClusterState().getCollectionsMap().values().stream()
+ return solrClient
+ .getClusterState()
+ .collectionStream()
.map(DocCollection::getReplicas)
.flatMap(List::stream)
.map(Replica::getBaseUrl)
diff --git
a/solr/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/SolrCloudScraperTest.java
b/solr/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/SolrCloudScraperTest.java
index 2ebc3752cae..386b1252e04 100644
---
a/solr/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/SolrCloudScraperTest.java
+++
b/solr/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/SolrCloudScraperTest.java
@@ -115,17 +115,15 @@ public class SolrCloudScraperTest extends
PrometheusExporterTestBase {
Map<String, MetricSamples> allCoreMetrics =
solrCloudScraper.pingAllCores(configuration.getPingConfiguration().get(0));
- Map<String, DocCollection> collectionStates =
getClusterState().getCollectionsMap();
+ final List<DocCollection> collectionStates =
getClusterState().collectionStream().toList();
long coreCount =
- collectionStates.entrySet().stream()
- .mapToInt(entry -> entry.getValue().getReplicas().size())
- .sum();
+ collectionStates.stream().mapToInt(docColl ->
docColl.getReplicas().size()).sum();
assertEquals(coreCount, allCoreMetrics.size());
- for (Map.Entry<String, DocCollection> entry : collectionStates.entrySet())
{
- String coreName = entry.getValue().getReplicas().get(0).getCoreName();
+ for (DocCollection docColl : collectionStates) {
+ String coreName = docColl.getReplicas().get(0).getCoreName();
assertTrue(allCoreMetrics.containsKey(coreName));
List<Collector.MetricFamilySamples> coreMetrics =
allCoreMetrics.get(coreName).asList();
assertEquals(1, coreMetrics.size());
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index 6219625dbd5..f4945921821 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -173,7 +173,9 @@ public class ClusterState implements MapWriter {
* semantics of how collection list is loaded have changed in SOLR-6629.
*
* @return a map of collection name vs DocCollection object
+ * @deprecated see {@link #collectionStream()}
*/
+ @Deprecated
public Map<String, DocCollection> getCollectionsMap() {
Map<String, DocCollection> result =
CollectionUtil.newHashMap(collectionStates.size());
for (Entry<String, CollectionRef> entry : collectionStates.entrySet()) {
@@ -415,8 +417,8 @@ public class ClusterState implements MapWriter {
}
/**
- * Streams the resolved {@link DocCollection}s. Use this sparingly in case
there are many
- * collections.
+ * Streams the resolved {@link DocCollection}s, which will often fetch from
ZooKeeper for each one
+ * for a many-collection scenario. Use this sparingly; some users have
thousands of collections!
*/
public Stream<DocCollection> collectionStream() {
return
collectionStates.values().stream().map(CollectionRef::get).filter(Objects::nonNull);
diff --git
a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index 003ef951b0f..0a88523f13a 100644
---
a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++
b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -69,6 +69,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ClusterStateUtil;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
@@ -105,8 +106,6 @@ import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.noggit.CharArr;
-import org.noggit.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -2539,10 +2538,7 @@ public abstract class AbstractFullDistribZkTestBase
extends AbstractDistribZkTes
if (collection != null) {
cs = clusterState.getCollection(collection).toString();
} else {
- Map<String, DocCollection> map = clusterState.getCollectionsMap();
- CharArr out = new CharArr();
- new JSONWriter(out, 2).write(map);
- cs = out.toString();
+ cs = ClusterStateUtil.toDebugAllStatesString(clusterState);
}
return cs;
}
diff --git
a/solr/test-framework/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
b/solr/test-framework/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
index 9442f9e1d25..9ecfe57379d 100644
---
a/solr/test-framework/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
+++
b/solr/test-framework/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
@@ -16,18 +16,18 @@
*/
package org.apache.solr.common.cloud;
-import java.lang.invoke.MethodHandles;
import java.util.Collection;
-import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.solr.common.util.Utils;
public class ClusterStateUtil {
- private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int TIMEOUT_POLL_MS = 1000;
@@ -53,109 +53,21 @@ public class ClusterStateUtil {
*/
public static boolean waitForAllActiveAndLiveReplicas(
ZkStateReader zkStateReader, String collection, int timeoutInMs) {
- long timeout =
- System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutInMs,
TimeUnit.MILLISECONDS);
- boolean success = false;
- while (!success && System.nanoTime() < timeout) {
- success = true;
- ClusterState clusterState = zkStateReader.getClusterState();
- if (clusterState != null) {
- Map<String, DocCollection> collections = null;
- if (collection != null) {
- collections =
- Collections.singletonMap(collection,
clusterState.getCollection(collection));
- } else {
- collections = clusterState.getCollectionsMap();
- }
- for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
- DocCollection docCollection = entry.getValue();
- Collection<Slice> slices = docCollection.getSlices();
- for (Slice slice : slices) {
- // only look at active shards
- if (slice.getState() == Slice.State.ACTIVE) {
- Collection<Replica> replicas = slice.getReplicas();
- for (Replica replica : replicas) {
- // on a live node?
- final boolean live =
clusterState.liveNodesContain(replica.getNodeName());
- final boolean isActive = replica.getState() ==
Replica.State.ACTIVE;
- if (!live || !isActive) {
- // fail
- success = false;
- }
- }
- }
- }
- }
- if (!success) {
- try {
- Thread.sleep(TIMEOUT_POLL_MS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
- }
- }
- }
- }
-
- return success;
+ return waitFor(
+ zkStateReader,
+ collection,
+ timeoutInMs,
+ TimeUnit.MILLISECONDS,
+ (liveNodes, state) ->
+ replicasOfActiveSlicesStream(state)
+ .allMatch(replica -> liveAndActivePredicate(replica,
liveNodes)));
}
- /**
- * Wait to see an entry in the ClusterState with a specific coreNodeName and
baseUrl.
- *
- * @param zkStateReader to use for ClusterState
- * @param collection to look in
- * @param coreNodeName to wait for
- * @param baseUrl to wait for
- * @param timeoutInMs how long to wait before giving up
- * @return false if timed out
- */
- public static boolean waitToSeeLiveReplica(
- ZkStateReader zkStateReader,
- String collection,
- String coreNodeName,
- String baseUrl,
- int timeoutInMs) {
- long timeout =
- System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutInMs,
TimeUnit.MILLISECONDS);
-
- while (System.nanoTime() < timeout) {
- log.debug(
- "waiting to see replica just created live collection={} replica={}
baseUrl={}",
- collection,
- coreNodeName,
- baseUrl);
- ClusterState clusterState = zkStateReader.getClusterState();
- if (clusterState != null) {
- DocCollection docCollection = clusterState.getCollection(collection);
- Collection<Slice> slices = docCollection.getSlices();
- for (Slice slice : slices) {
- // only look at active shards
- if (slice.getState() == Slice.State.ACTIVE) {
- Collection<Replica> replicas = slice.getReplicas();
- for (Replica replica : replicas) {
- // on a live node?
- boolean live =
clusterState.liveNodesContain(replica.getNodeName());
- String rcoreNodeName = replica.getName();
- String rbaseUrl = replica.getBaseUrl();
- if (live && coreNodeName.equals(rcoreNodeName) &&
baseUrl.equals(rbaseUrl)) {
- // found it
- return true;
- }
- }
- }
- }
- try {
- Thread.sleep(TIMEOUT_POLL_MS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
- }
- }
- }
-
- log.error("Timed out waiting to see replica just created in cluster state.
Continuing...");
- return false;
+ private static boolean liveAndActivePredicate(Replica replica, Set<String>
liveNodes) {
+ // on a live node?
+ final boolean live = liveNodes.contains(replica.getNodeName());
+ final boolean isActive = replica.getState() == Replica.State.ACTIVE;
+ return live && isActive;
}
public static boolean waitForAllReplicasNotLive(ZkStateReader zkStateReader,
int timeoutInMs) {
@@ -164,86 +76,111 @@ public class ClusterStateUtil {
public static boolean waitForAllReplicasNotLive(
ZkStateReader zkStateReader, String collection, int timeoutInMs) {
- long timeout =
- System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutInMs,
TimeUnit.MILLISECONDS);
- boolean success = false;
- while (!success && System.nanoTime() < timeout) {
- success = true;
- ClusterState clusterState = zkStateReader.getClusterState();
- if (clusterState != null) {
- Map<String, DocCollection> collections = null;
- if (collection != null) {
- collections =
- Collections.singletonMap(collection,
clusterState.getCollection(collection));
- } else {
- collections = clusterState.getCollectionsMap();
- }
- for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
- DocCollection docCollection = entry.getValue();
- Collection<Slice> slices = docCollection.getSlices();
- for (Slice slice : slices) {
- // only look at active shards
- if (slice.getState() == Slice.State.ACTIVE) {
- Collection<Replica> replicas = slice.getReplicas();
- for (Replica replica : replicas) {
- // on a live node?
- boolean live =
clusterState.liveNodesContain(replica.getNodeName());
- if (live) {
- // fail
- success = false;
- }
- }
- }
- }
- }
- if (!success) {
- try {
- Thread.sleep(TIMEOUT_POLL_MS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
- }
- }
- }
- }
-
- return success;
+ return waitFor(
+ zkStateReader,
+ collection,
+ timeoutInMs,
+ TimeUnit.MILLISECONDS,
+ (liveNodes, state) ->
+ replicasOfActiveSlicesStream(state)
+ .noneMatch(replica ->
liveNodes.contains(replica.getNodeName())));
}
public static int getLiveAndActiveReplicaCount(ZkStateReader zkStateReader,
String collection) {
- Slice[] slices;
- slices =
zkStateReader.getClusterState().getCollection(collection).getActiveSlicesArr();
- int liveAndActive = 0;
- for (Slice slice : slices) {
- for (Replica replica : slice.getReplicas()) {
- boolean live =
zkStateReader.getClusterState().liveNodesContain(replica.getNodeName());
- boolean active = replica.getState() == Replica.State.ACTIVE;
- if (live && active) {
- liveAndActive++;
- }
- }
- }
- return liveAndActive;
+ ClusterState clusterState = zkStateReader.getClusterState();
+ var liveNodes = clusterState.getLiveNodes();
+ var state = clusterState.getCollection(collection);
+ return (int)
+ replicasOfActiveSlicesStream(state)
+ .filter(replica -> liveAndActivePredicate(replica, liveNodes))
+ .count();
+ }
+
+ public static Stream<Replica> replicasOfActiveSlicesStream(DocCollection
collectionState) {
+ return collectionState.getActiveSlices().stream()
+ .map(Slice::getReplicas)
+ .flatMap(Collection::stream);
}
public static boolean waitForLiveAndActiveReplicaCount(
ZkStateReader zkStateReader, String collection, int replicaCount, int
timeoutInMs) {
- long timeout =
- System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutInMs,
TimeUnit.MILLISECONDS);
- boolean success = false;
- while (!success && System.nanoTime() < timeout) {
- success = getLiveAndActiveReplicaCount(zkStateReader, collection) ==
replicaCount;
-
- if (!success) {
- try {
- Thread.sleep(TIMEOUT_POLL_MS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
+ return waitFor(
+ zkStateReader,
+ collection,
+ timeoutInMs,
+ TimeUnit.MILLISECONDS,
+ (liveNodes, state) ->
+ replicasOfActiveSlicesStream(state)
+ .filter(replica -> liveAndActivePredicate(replica,
liveNodes))
+ .count()
+ == replicaCount);
+ }
+
+ /**
+ * Calls {@link ZkStateReader#waitForState(String, long, TimeUnit,
CollectionStatePredicate)} but
+ * has an alternative implementation if {@code collection} is null, in which
the predicate must
+ * match *all* collections. Returns whether the predicate matches or not in
the allotted time;
+ * does *NOT* throw {@link TimeoutException}.
+ */
+ public static boolean waitFor(
+ ZkStateReader zkStateReader,
+ String collection,
+ long timeout,
+ TimeUnit timeUnit,
+ CollectionStatePredicate predicate) {
+ // ideally a collection is specified...
+ if (collection != null) {
+ try {
+ zkStateReader.waitForState(collection, timeout, timeUnit, predicate);
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
+ } catch (TimeoutException e) {
+ return false;
+ }
+ }
+
+ // otherwise we check all collections...
+
+ final long timeoutAtNs = System.nanoTime() +
TimeUnit.NANOSECONDS.convert(timeout, timeUnit);
+ while (true) {
+ ClusterState clusterState = zkStateReader.getClusterState(); // fresh
state
+ if (clusterState != null) { // it's sad to deal with this; API contract
should forbid
+ var liveNodes = clusterState.getLiveNodes();
+ if (clusterState
+ .collectionStream()
+ .allMatch(state -> predicate.matches(liveNodes, state))) {
+ return true;
}
}
+
+ if (System.nanoTime() > timeoutAtNs) {
+ return false;
+ }
+
+ try {
+ Thread.sleep(TIMEOUT_POLL_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
+ }
}
+ }
- return success;
+ /** Produces a String of all the collection states for debugging. ZK may be
consulted. */
+ public static String toDebugAllStatesString(ClusterState clusterState) {
+ // note: ClusterState.toString prints the in-memory state info it has
without consulting ZK
+
+ // Collect to a Map by name, loading each DocCollection expressed as a Map
+ var stateMap =
+ clusterState
+ .collectionStream()
+ .collect(
+ LinkedHashMap::new,
+ (map, state) -> map.put(state.getName(), state.toMap(new
LinkedHashMap<>())),
+ Map::putAll);
+ // toJSON requires standard types like Map; doesn't know about
DocCollection etc.
+ return Utils.toJSONString(stateMap);
}
}