murblanc commented on code in PR #2846:
URL: https://github.com/apache/solr/pull/2846#discussion_r1830057908


##########
solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java:
##########
@@ -175,14 +175,12 @@ public void call(ClusterState state, ZkNodeProps message, 
NamedList<Object> resu
 
         // make sure the configSet is not shared with other collections
         // Similar to what happens in: ConfigSetCmds::deleteConfigSet
-        for (Map.Entry<String, DocCollection> entry :
-            zkStateReader.getClusterState().getCollectionsMap().entrySet()) {
-          String otherConfigSetName = entry.getValue().getConfigName();
-          if (configSetName.equals(otherConfigSetName)) {
-            configSetIsUsedByOtherCollection = true;
-            break;
-          }
-        }
+        configSetIsUsedByOtherCollection =
+            zkStateReader
+                .getClusterState()
+                .collectionStream()
+                .map(DocCollection::getConfigName)
+                .anyMatch(cf -> cf.equals(configSetName));

Review Comment:
   Maybe do `equals()` in the other direction to prevent NPE if `cf` is `null`?



##########
solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java:
##########
@@ -1067,6 +1068,10 @@ public void testDeleteAliasedCollection() throws 
Exception {
         });
   }
 
+  private static String toString(ClusterState state) {

Review Comment:
   I'd name this `collectionNames` or something with more meaning than 
`toString`



##########
solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java:
##########
@@ -178,79 +180,71 @@ private void fetchClusterStatusForCollOrAlias(
     String routeKey = solrParams.get(ShardParams._ROUTE_);
     String shard = solrParams.get(ZkStateReader.SHARD_ID_PROP);
 
-    Map<String, DocCollection> collectionsMap = null;
-    if (collection == null) {
-      collectionsMap = clusterState.getCollectionsMap();
+    Stream<DocCollection> collectionStream;
+    if (collection == null) { // uh-oh; hopefully not a lot

Review Comment:
   Unrelated, but this method that was already hard to read and I find the 
changes below make it even harder to read/maintain.
   Maybe it's just me though.



##########
solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java:
##########
@@ -49,23 +49,14 @@ public List<ZkWriteCommand> downNode(ClusterState 
clusterState, ZkNodeProps mess
 
     log.debug("DownNode state invoked for node: {}", nodeName);
 
-    List<ZkWriteCommand> zkWriteCommands = new ArrayList<>();
-
-    Map<String, DocCollection> collections = clusterState.getCollectionsMap();
-    for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
-      String collectionName = entry.getKey();
-      DocCollection docCollection = entry.getValue();
-      if (docCollection.isPerReplicaState()) continue;
-
-      Optional<ZkWriteCommand> zkWriteCommand =
-          computeCollectionUpdate(nodeName, collectionName, docCollection, 
zkClient);
-
-      if (zkWriteCommand.isPresent()) {
-        zkWriteCommands.add(zkWriteCommand.get());
-      }
-    }
-
-    return zkWriteCommands;
+    return clusterState
+        .collectionStream()
+        .filter(entry -> !entry.isPerReplicaState())
+        .map(
+            docCollection ->
+                computeCollectionUpdate(nodeName, docCollection.getName(), 
docCollection, zkClient))

Review Comment:
   Maybe change the signature of `computeCollectionUpdate()` to get the name 
from the passed in `DocCollection` and save one argument here?



##########
solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java:
##########
@@ -411,28 +417,19 @@ public Set<String> getHostAllowList() {
   }
 
   /**
-   * Iterate over collections. Unlike {@link #getCollectionStates()} 
collections passed to the
-   * consumer are guaranteed to exist.
-   *
-   * @param consumer collection consumer.
+   * Streams the resolved {@link DocCollection}s, which will often fetch from 
ZooKeeper for each one
+   * for a many-collection scenario. Use this sparingly; some users have 
thousands of collections!
+   */
+  public Stream<DocCollection> collectionStream() {
+    return 
collectionStates.values().stream().map(CollectionRef::get).filter(Objects::nonNull);
+  }
+
+  /**
+   * Calls {@code consumer} with a resolved {@link DocCollection}s for all 
collections. Use this
+   * sparingly in case there are many collections.
    */
   public void forEachCollection(Consumer<DocCollection> consumer) {

Review Comment:
   I wonder if we should keep this method now that we have `collectionStream()`.



##########
solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java:
##########
@@ -178,79 +180,71 @@ private void fetchClusterStatusForCollOrAlias(
     String routeKey = solrParams.get(ShardParams._ROUTE_);
     String shard = solrParams.get(ZkStateReader.SHARD_ID_PROP);
 
-    Map<String, DocCollection> collectionsMap = null;
-    if (collection == null) {
-      collectionsMap = clusterState.getCollectionsMap();
+    Stream<DocCollection> collectionStream;
+    if (collection == null) { // uh-oh; hopefully not a lot

Review Comment:
   I assume you mean that cluster status is going to return a lot of data and 
we don't like it, but the comment doesn't really help to understand that (and 
if that's the actual need, it's not an issue, just a price to pay...)



##########
solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java:
##########
@@ -172,7 +173,9 @@ public Collection<String> getCollectionNames() {
    * semantics of how collection list is loaded have changed in SOLR-6629.
    *
    * @return a map of collection name vs DocCollection object
+   * @deprecated see {@link #collectionStream()}
    */
+  @Deprecated

Review Comment:
   Are there still uses of `getCollectionsMap()` in the code? If not, I suggest 
to remove this method rather than deprecate it. This is not a public API, right?
   If references remain, why not remove them?
   Same comment for `getCollectionStates()` below.



##########
solr/test-framework/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java:
##########
@@ -164,86 +72,83 @@ public static boolean 
waitForAllReplicasNotLive(ZkStateReader zkStateReader, int
 
   public static boolean waitForAllReplicasNotLive(
       ZkStateReader zkStateReader, String collection, int timeoutInMs) {
-    long timeout =
-        System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutInMs, 
TimeUnit.MILLISECONDS);
-    boolean success = false;
-    while (!success && System.nanoTime() < timeout) {
-      success = true;
-      ClusterState clusterState = zkStateReader.getClusterState();
-      if (clusterState != null) {
-        Map<String, DocCollection> collections = null;
-        if (collection != null) {
-          collections =
-              Collections.singletonMap(collection, 
clusterState.getCollection(collection));
-        } else {
-          collections = clusterState.getCollectionsMap();
-        }
-        for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
-          DocCollection docCollection = entry.getValue();
-          Collection<Slice> slices = docCollection.getSlices();
-          for (Slice slice : slices) {
-            // only look at active shards
-            if (slice.getState() == Slice.State.ACTIVE) {
-              Collection<Replica> replicas = slice.getReplicas();
-              for (Replica replica : replicas) {
-                // on a live node?
-                boolean live = 
clusterState.liveNodesContain(replica.getNodeName());
-                if (live) {
-                  // fail
-                  success = false;
-                }
-              }
-            }
-          }
-        }
-        if (!success) {
-          try {
-            Thread.sleep(TIMEOUT_POLL_MS);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
-          }
-        }
-      }
-    }
-
-    return success;
+    return waitFor(
+        zkStateReader,
+        timeoutInMs,
+        collection,
+        (liveNodes, state) ->
+            replicasOfActiveSlicesStream(state)
+                .noneMatch(replica -> 
liveNodes.contains(replica.getNodeName())));
   }
 
   public static int getLiveAndActiveReplicaCount(ZkStateReader zkStateReader, 
String collection) {
-    Slice[] slices;
-    slices = 
zkStateReader.getClusterState().getCollection(collection).getActiveSlicesArr();
-    int liveAndActive = 0;
-    for (Slice slice : slices) {
-      for (Replica replica : slice.getReplicas()) {
-        boolean live = 
zkStateReader.getClusterState().liveNodesContain(replica.getNodeName());
-        boolean active = replica.getState() == Replica.State.ACTIVE;
-        if (live && active) {
-          liveAndActive++;
-        }
-      }
-    }
-    return liveAndActive;
+    ClusterState clusterState = zkStateReader.getClusterState();
+    var liveNodes = clusterState.getLiveNodes();
+    var state = clusterState.getCollection(collection);
+    return (int)
+        replicasOfActiveSlicesStream(state)
+            .filter(replica -> liveAndActivePredicate(replica, liveNodes))
+            .count();
+  }
+
+  public static Stream<Replica> replicasOfActiveSlicesStream(DocCollection 
collectionState) {
+    return collectionState.getActiveSlices().stream()
+        .map(Slice::getReplicas)
+        .flatMap(Collection::stream);
   }
 
   public static boolean waitForLiveAndActiveReplicaCount(
       ZkStateReader zkStateReader, String collection, int replicaCount, int 
timeoutInMs) {
-    long timeout =
-        System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutInMs, 
TimeUnit.MILLISECONDS);
-    boolean success = false;
-    while (!success && System.nanoTime() < timeout) {
-      success = getLiveAndActiveReplicaCount(zkStateReader, collection) == 
replicaCount;
+    return waitFor(
+        zkStateReader,
+        timeoutInMs,
+        collection,
+        (liveNodes, state) ->
+            replicasOfActiveSlicesStream(state)
+                    .filter(replica -> liveAndActivePredicate(replica, 
liveNodes))
+                    .count()
+                == replicaCount);
+  }
+
+  public static boolean waitFor(
+      ZkStateReader zkStateReader,
+      long timeoutMs,
+      String collection,
+      CollectionStatePredicate predicate) {
+    if (collection != null) {
+      try {
+        zkStateReader.waitForState(collection, timeoutMs, 
TimeUnit.MILLISECONDS, predicate);
+        return true;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
+      } catch (TimeoutException e) {
+        return false;
+      }
+    }
+    // TODO first timeout on clusterState existing (add method to 
ZkStateReader?) then
 
-      if (!success) {
-        try {
-          Thread.sleep(TIMEOUT_POLL_MS);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
+    final long timeoutAtNs =
+        System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutMs, 
TimeUnit.MILLISECONDS);
+    while (System.nanoTime() < timeoutAtNs) {

Review Comment:
   Maybe change into a `do...while` so that:
   - It runs at least once even if `timeoutMs` is 0 (or non 0 + an improbable 
GC at the wrong moment)
   - It doesn't `sleep` then fails directly without checking again if 
`predicate` matches when the timeout is exceeded



##########
solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java:
##########
@@ -1094,20 +1094,19 @@ private SolrCore checkProps(ZkNodeProps zkProps) {
 
   private void getSlicesForCollections(
       ClusterState clusterState, Collection<Slice> slices, boolean 
activeSlices) {
+
     if (activeSlices) {
-      for (Map.Entry<String, DocCollection> entry : 
clusterState.getCollectionsMap().entrySet()) {
-        final Slice[] activeCollectionSlices = 
entry.getValue().getActiveSlicesArr();
-        if (activeCollectionSlices != null) {
-          Collections.addAll(slices, activeCollectionSlices);
-        }
-      }
+      clusterState
+          .collectionStream()
+          .map(DocCollection::getActiveSlicesArr)
+          .filter(Objects::nonNull)
+          .forEach(them -> Collections.addAll(slices, them));

Review Comment:
   maybe use another variable name than `them` that doesn't convey much meaning?



##########
solr/modules/hdfs/src/test/org/apache/solr/hdfs/cloud/SharedFileSystemAutoReplicaFailoverTest.java:
##########
@@ -383,44 +381,16 @@ private boolean waitingForReplicasNotLive(
             .filter(jetty -> jetty.getCoreContainer() != null)
             .map(JettySolrRunner::getNodeName)
             .collect(Collectors.toSet());
-    long timeout =
-        System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutInMs, 
TimeUnit.MILLISECONDS);
-    boolean success = false;
-    while (!success && System.nanoTime() < timeout) {
-      success = true;
-      ClusterState clusterState = zkStateReader.getClusterState();
-      if (clusterState != null) {
-        Map<String, DocCollection> collections = 
clusterState.getCollectionsMap();
-        for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
-          DocCollection docCollection = entry.getValue();
-          Collection<Slice> slices = docCollection.getSlices();
-          for (Slice slice : slices) {
-            // only look at active shards
-            if (slice.getState() == Slice.State.ACTIVE) {
-              Collection<Replica> replicas = slice.getReplicas();
-              for (Replica replica : replicas) {
-                if (nodeNames.contains(replica.getNodeName())) {
-                  boolean live = 
clusterState.liveNodesContain(replica.getNodeName());
-                  if (live) {
-                    success = false;
-                  }
-                }
-              }
-            }
-          }
-        }
-        if (!success) {
-          try {
-            Thread.sleep(500);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Interrupted");
-          }
-        }
-      }
-    }
-
-    return success;
+    return ClusterStateUtil.waitFor(

Review Comment:
   Nice cleanup!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to