madrob commented on code in PR #909:
URL: https://github.com/apache/solr/pull/909#discussion_r922256956


##########
solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java:
##########
@@ -39,4 +45,31 @@ public static void setAdditionalProps(Map<String, String> 
additionalSystemProps)
   public static Map<String, String> injectAdditionalProps() {
     return additionalSystemProps;
   }
+
+  /**
+   * Set test delay (sleep) in unit of millisec
+   *
+   * @param delay delay in millisec, null to remove such delay
+   */
+  public static void setDelay(Integer delay) {
+    CommonTestInjection.delay = delay;
+  }
+
+  /**
+   * Inject an artificial delay(sleep) into the code
+   *
+   * @return true
+   */
+  public static boolean injectDelay() {
+    if (delay != null) {
+      try {
+        log.info("Start: artificial delay for {}ms", delay);
+        Thread.sleep(delay);
+        log.info("Finish: artificial delay for {}ms", delay);

Review Comment:
   nit: move this to a finally block so that we still log even if we were 
interrupted.



##########
solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -1919,9 +2051,9 @@ public void removeDocCollectionWatcher(String collection, 
DocCollectionWatcher w
           if (v == null) return null;
           v.stateWatchers.remove(watcher);
           if (v.canBeRemoved()) {
-            watchedCollectionStates.remove(collection);
             lazyCollectionStates.put(collection, new 
LazyCollectionRef(collection));
             reconstructState.set(true);
+            CommonTestInjection.injectDelay(); // To unit test race condition

Review Comment:
   our common pattern is to hide this behind a java `assert`. See also 
https://github.com/apache/solr/blob/main/solr/core/src/java/org/apache/solr/query/SolrRangeQuery.java#L183



##########
solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java:
##########
@@ -29,171 +38,387 @@
 import org.apache.solr.cloud.ZkTestServer;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocCollectionWatcher;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.CommonTestInjection;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.handler.admin.ConfigSetsHandler;
 import org.apache.solr.util.TimeOut;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ZkStateReaderTest extends SolrTestCaseJ4 {
-
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final long TIMEOUT = 30;
 
-  public void testExternalCollectionWatchedNotWatched() throws Exception {
-    Path zkDir = createTempDir("testExternalCollectionWatchedNotWatched");
-    ZkTestServer server = new ZkTestServer(zkDir);
-    SolrZkClient zkClient = null;
-    ZkStateReader reader = null;
-
-    try {
-      server.run();
-
-      zkClient = new SolrZkClient(server.getZkAddress(), 
OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
-      ZkController.createClusterZkNodes(zkClient);
-
-      reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
-
-      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
-
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
-
-      // create new collection
-      ZkWriteCommand c1 =
-          new ZkWriteCommand(
-              "c1",
-              new DocCollection(
-                  "c1",
-                  new HashMap<>(),
-                  Map.of(ZkStateReader.CONFIGNAME_PROP, 
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
-                  DocRouter.DEFAULT,
-                  0));
-      writer.enqueueUpdate(reader.getClusterState(), 
Collections.singletonList(c1), null);
-      writer.writePendingUpdates();
-      reader.forceUpdateCollection("c1");
-
-      
assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
-      reader.registerCore("c1");
-      
assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
-      reader.unregisterCore("c1");
-      
assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+  private static class TestFixture implements Closeable {
+    private final ZkTestServer server;
+    private final SolrZkClient zkClient;
+    private final ZkStateReader reader;
+    private final ZkStateWriter writer;
+
+    private TestFixture(
+        ZkTestServer server, SolrZkClient zkClient, ZkStateReader reader, 
ZkStateWriter writer) {
+      this.server = server;
+      this.zkClient = zkClient;
+      this.reader = reader;
+      this.writer = writer;
+    }
 
-    } finally {
+    @Override
+    public void close() throws IOException {
       IOUtils.close(reader, zkClient);
-      server.shutdown();
+      try {
+        server.shutdown();
+      } catch (InterruptedException e) {
+        // ok. Shutting down anyway
+      }
     }
   }
 
-  public void testCollectionStateWatcherCaching() throws Exception {
-    Path zkDir = createTempDir("testCollectionStateWatcherCaching");
-
-    ZkTestServer server = new ZkTestServer(zkDir);
+  private TestFixture fixture = null;
 
-    SolrZkClient zkClient = null;
-    ZkStateReader reader = null;
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    fixture = setupTestFixture(getTestName());
+  }
 
-    try {
-      server.run();
-
-      zkClient = new SolrZkClient(server.getZkAddress(), 
OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
-      ZkController.createClusterZkNodes(zkClient);
-
-      reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
-
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
-
-      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
-      DocCollection state =
-          new DocCollection(
-              "c1",
-              new HashMap<>(),
-              Map.of(ZkStateReader.CONFIGNAME_PROP, 
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
-              DocRouter.DEFAULT,
-              0);
-      ZkWriteCommand wc = new ZkWriteCommand("c1", state);
-      writer.enqueueUpdate(reader.getClusterState(), 
Collections.singletonList(wc), null);
-      writer.writePendingUpdates();
-      assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + 
"/c1/state.json", true));
-      reader.waitForState(
-          "c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> 
collectionState != null);
-
-      Map<String, Object> props = new HashMap<>();
-      props.put("x", "y");
-      props.put(ZkStateReader.CONFIGNAME_PROP, 
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME);
-      state = new DocCollection("c1", new HashMap<>(), props, 
DocRouter.DEFAULT, 0);
-      wc = new ZkWriteCommand("c1", state);
-      writer.enqueueUpdate(reader.getClusterState(), 
Collections.singletonList(wc), null);
-      writer.writePendingUpdates();
-
-      boolean found = false;
-      TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
-      while (!timeOut.hasTimedOut()) {
-        DocCollection c1 = reader.getClusterState().getCollection("c1");
-        if ("y".equals(c1.getStr("x"))) {
-          found = true;
-          break;
-        }
-      }
-      assertTrue("Could not find updated property in collection c1 even after 
5 seconds", found);
-    } finally {
-      IOUtils.close(reader, zkClient);
-      server.shutdown();
+  @After
+  public void tearDown() throws Exception {
+    if (fixture != null) {
+      fixture.close();
     }
+    super.tearDown();
   }
 
-  public void testWatchedCollectionCreation() throws Exception {
-    Path zkDir = createTempDir("testWatchedCollectionCreation");
-
+  private static TestFixture setupTestFixture(String testPrefix) throws 
Exception {
+    Path zkDir = createTempDir(testPrefix);
     ZkTestServer server = new ZkTestServer(zkDir);
+    server.run();
+    SolrZkClient zkClient =
+        new SolrZkClient(server.getZkAddress(), 
OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+    ZkController.createClusterZkNodes(zkClient);
 
-    SolrZkClient zkClient = null;
-    ZkStateReader reader = null;
-
-    try {
-      server.run();
+    ZkStateReader reader = new ZkStateReader(zkClient);
+    reader.createClusterStateWatchersAndUpdate();
 
-      zkClient = new SolrZkClient(server.getZkAddress(), 
OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
-      ZkController.createClusterZkNodes(zkClient);
+    ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
 
-      reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
-      reader.registerCore("c1");
+    return new TestFixture(server, zkClient, reader, writer);
+  }
 
-      // Initially there should be no c1 collection.
-      assertNull(reader.getClusterState().getCollectionRef("c1"));
+  public void testExternalCollectionWatchedNotWatched() throws Exception {
+    ZkStateWriter writer = fixture.writer;
+    ZkStateReader reader = fixture.reader;
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+    // create new collection
+    ZkWriteCommand c1 =
+        new ZkWriteCommand(
+            "c1",
+            new DocCollection(
+                "c1",
+                new HashMap<>(),
+                Map.of(ZkStateReader.CONFIGNAME_PROP, 
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+                DocRouter.DEFAULT,
+                0));
+
+    writer.enqueueUpdate(reader.getClusterState(), 
Collections.singletonList(c1), null);
+    writer.writePendingUpdates();
+    reader.forceUpdateCollection("c1");
+
+    
assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+    reader.registerCore("c1");
+    
assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+    reader.unregisterCore("c1");
+    
assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+  }
 
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
-      reader.forceUpdateCollection("c1");
+  public void testCollectionStateWatcherCaching() throws Exception {
+    ZkStateWriter writer = fixture.writer;
+    ZkStateReader reader = fixture.reader;
+
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+    DocCollection state =
+        new DocCollection(
+            "c1",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, 
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            0);
+    ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+    writer.enqueueUpdate(reader.getClusterState(), 
Collections.singletonList(wc), null);
+    writer.writePendingUpdates();
+    assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + 
"/c1/state.json", true));
+    reader.waitForState(
+        "c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> 
collectionState != null);
+
+    Map<String, Object> props = new HashMap<>();
+    props.put("x", "y");
+    props.put(ZkStateReader.CONFIGNAME_PROP, 
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME);
+    state = new DocCollection("c1", new HashMap<>(), props, DocRouter.DEFAULT, 
0);
+    wc = new ZkWriteCommand("c1", state);
+    writer.enqueueUpdate(reader.getClusterState(), 
Collections.singletonList(wc), null);
+    writer.writePendingUpdates();
+
+    boolean found = false;
+    TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+    while (!timeOut.hasTimedOut()) {
+      DocCollection c1 = reader.getClusterState().getCollection("c1");
+      if ("y".equals(c1.getStr("x"))) {
+        found = true;
+        break;
+      }
+    }
+    assertTrue("Could not find updated property in collection c1 even after 5 
seconds", found);
+  }
 
-      // Still no c1 collection, despite a collection path.
-      assertNull(reader.getClusterState().getCollectionRef("c1"));
+  public void testWatchedCollectionCreation() throws Exception {
+    ZkStateWriter writer = fixture.writer;
+    ZkStateReader reader = fixture.reader;
+
+    reader.registerCore("c1");
+
+    // Initially there should be no c1 collection.
+    assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+    reader.forceUpdateCollection("c1");
+
+    // Still no c1 collection, despite a collection path.
+    assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+    // create new collection
+    DocCollection state =
+        new DocCollection(
+            "c1",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, 
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            0);
+    ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+    writer.enqueueUpdate(reader.getClusterState(), 
Collections.singletonList(wc), null);
+    writer.writePendingUpdates();
+
+    assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + 
"/c1/state.json", true));
+
+    // reader.forceUpdateCollection("c1");
+    reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null);
+    ClusterState.CollectionRef ref = 
reader.getClusterState().getCollectionRef("c1");
+    assertNotNull(ref);
+    assertFalse(ref.isLazilyLoaded());
+  }
 
-      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+  public void testForciblyRefreshAllClusterState() throws Exception {
+    ZkStateWriter writer = fixture.writer;
+    ZkStateReader reader = fixture.reader;
+
+    reader.registerCore("c1"); // watching c1, so it should get non lazy 
reference
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+    reader.forciblyRefreshAllClusterStateSlow();
+    // Initially there should be no c1 collection.
+    assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+    // create new collection
+    DocCollection state =
+        new DocCollection(
+            "c1",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, 
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            0);
+    ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+    writer.enqueueUpdate(reader.getClusterState(), 
Collections.singletonList(wc), null);
+    writer.writePendingUpdates();
+
+    assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + 
"/c1/state.json", true));
+
+    reader.forciblyRefreshAllClusterStateSlow();
+    ClusterState.CollectionRef ref = 
reader.getClusterState().getCollectionRef("c1");
+    assertNotNull(ref);
+    assertFalse(ref.isLazilyLoaded());
+    assertEquals(0, ref.get().getZNodeVersion());
+
+    // update the collection
+    state =
+        new DocCollection(
+            "c1",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, 
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            ref.get().getZNodeVersion());
+    wc = new ZkWriteCommand("c1", state);
+    writer.enqueueUpdate(reader.getClusterState(), 
Collections.singletonList(wc), null);
+    writer.writePendingUpdates();
+
+    reader.forciblyRefreshAllClusterStateSlow();
+    ref = reader.getClusterState().getCollectionRef("c1");
+    assertNotNull(ref);
+    assertFalse(ref.isLazilyLoaded());
+    assertEquals(1, ref.get().getZNodeVersion());
+
+    // delete the collection c1, add a collection c2 that is NOT watched
+    ZkWriteCommand wc1 = new ZkWriteCommand("c1", null);
+
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+    state =
+        new DocCollection(
+            "c2",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, 
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            0);
+    ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
+
+    writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), 
null);
+    writer.writePendingUpdates();
+
+    reader.forciblyRefreshAllClusterStateSlow();
+    ref = reader.getClusterState().getCollectionRef("c1");
+    assertNull(ref);
+
+    ref = reader.getClusterState().getCollectionRef("c2");
+    assertNotNull(ref);
+    assertTrue(
+        "c2 should have been lazily loaded but is not!",
+        ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not 
watched
+    assertEquals(0, ref.get().getZNodeVersion());
+  }
 
-      // create new collection
-      DocCollection state =
-          new DocCollection(
-              "c1",
-              new HashMap<>(),
-              Map.of(ZkStateReader.CONFIGNAME_PROP, 
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
-              DocRouter.DEFAULT,
-              0);
-      ZkWriteCommand wc = new ZkWriteCommand("c1", state);
-      writer.enqueueUpdate(reader.getClusterState(), 
Collections.singletonList(wc), null);
-      writer.writePendingUpdates();
+  public void testGetCurrentCollections() throws Exception {
+    ZkStateWriter writer = fixture.writer;
+    ZkStateReader reader = fixture.reader;
+
+    reader.registerCore("c1"); // listen to c1. not yet exist
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+    reader.forceUpdateCollection("c1");
+    Set<String> currentCollections = reader.getCurrentCollections();
+    assertEquals(0, currentCollections.size()); // no active collections yet
+
+    // now create both c1 (watched) and c2 (not watched)
+    DocCollection state1 =
+        new DocCollection(
+            "c1",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, 
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            0);
+    ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
+    DocCollection state2 =
+        new DocCollection(
+            "c2",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, 
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            0);
+
+    // do not listen to c2
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+    ZkWriteCommand wc2 = new ZkWriteCommand("c2", state2);
+
+    writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), 
null);
+    writer.writePendingUpdates();
+
+    reader.forceUpdateCollection("c1");
+    reader.forceUpdateCollection("c2");
+    currentCollections =
+        reader.getCurrentCollections(); // should detect both collections (c1 
watched, c2 lazy
+    // loaded)

Review Comment:
   nit: this is strange wrapping, separate the comment onto its own line?



##########
solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -539,17 +672,17 @@ private void constructState(Set<String> 
changedCollections) {
     if (log.isDebugEnabled()) {
       log.debug(
           "clusterStateSet: interesting [{}] watched [{}] lazy [{}] total 
[{}]",
-          collectionWatches.keySet().size(),
-          watchedCollectionStates.keySet().size(),
+          collectionWatches.watchedCollections().size(),
+          collectionWatches.activeCollectionCount(),
           lazyCollectionStates.keySet().size(),
           clusterState.getCollectionStates().size());
     }
 
     if (log.isTraceEnabled()) {
       log.trace(
           "clusterStateSet: interesting [{}] watched [{}] lazy [{}] total 
[{}]",
-          collectionWatches.keySet(),
-          watchedCollectionStates.keySet(),
+          collectionWatches.watchedCollections(),
+          collectionWatches.activeCollections(),
           lazyCollectionStates.keySet(),
           clusterState.getCollectionStates());
     }

Review Comment:
   Is there still technically a race condition here where the watched 
collections and active collections could change between the successive calls?



##########
solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -246,6 +246,134 @@ public boolean canBeRemoved() {
     }
   }
 
+  /**
+   * A ConcurrentHashMap of active watcher by collection name
+   *
+   * <p>Each watcher DocCollectionWatch also contains the latest DocCollection 
(state) observed
+   */
+  private static class DocCollectionWatches {
+    private final ConcurrentHashMap<String, StatefulCollectionWatch>
+        statefulWatchesByCollectionName = new ConcurrentHashMap<>();
+
+    /**
+     * Gets the DocCollection (state) of the collection which the 
corresponding watch last observed
+     *
+     * @param collection the collection name to get DocCollection on
+     * @return The last observed DocCollection(state). if null, that means 
there's no such
+     *     collection.
+     */
+    private DocCollection getDocCollection(String collection) {
+      StatefulCollectionWatch watch = 
statefulWatchesByCollectionName.get(collection);
+      return watch != null ? watch.currentState : null;
+    }
+
+    /**
+     * Gets the active collections (collections that exist) being watched
+     *
+     * @return an immutable set of active collection names
+     */
+    private Set<String> activeCollections() {
+      return statefulWatchesByCollectionName.entrySet().stream()
+          .filter(
+              (Entry<String, StatefulCollectionWatch> entry) ->
+                  entry.getValue().currentState != null)
+          .map(Entry::getKey)
+          .collect(Collectors.toUnmodifiableSet());
+    }
+
+    /**
+     * Gets the count of active collections (collections that exist) being 
watched
+     *
+     * @return the count of active collections
+     */
+    private long activeCollectionCount() {
+      return statefulWatchesByCollectionName.entrySet().stream()
+          .filter(
+              (Entry<String, StatefulCollectionWatch> entry) ->
+                  entry.getValue().currentState != null)
+          .count();
+    }
+
+    private Set<String> watchedCollections() {
+      return 
Collections.unmodifiableSet(statefulWatchesByCollectionName.keySet());
+    }
+
+    private Set<Entry<String, StatefulCollectionWatch>> 
watchedCollectionEntries() {
+      return 
Collections.unmodifiableSet(statefulWatchesByCollectionName.entrySet());
+    }
+
+    /**
+     * Updates the latest observed DocCollection (state) of the {@link 
StatefulCollectionWatch} if
+     * the collection is being watched
+     *
+     * @param collection the collection name
+     * @param newState the new DocCollection (state) observed
+     * @return whether the state has changed for the watched collection
+     */
+    private boolean updateDocCollection(String collection, DocCollection 
newState) {
+      AtomicBoolean stateHasChanged = new AtomicBoolean(false);
+      statefulWatchesByCollectionName.computeIfPresent(
+          collection,
+          (col, watch) -> {
+            DocCollection oldState = watch.currentState;
+            if (oldState == null && newState == null) {
+              // OK, the collection not yet exist in ZK or already deleted
+            } else if (oldState == null) {
+              if (log.isDebugEnabled()) {
+                log.debug("Add data for [{}] ver [{}]", collection, 
newState.getZNodeVersion());
+              }
+              watch.currentState = newState;
+            } else if (newState == null) {
+              log.debug("Removing cached collection state for [{}]", 
collection);
+              watch.currentState = null;
+            } else { // both new and old states are non-null
+              int oldCVersion =
+                  oldState.getPerReplicaStates() == null
+                      ? -1
+                      : oldState.getPerReplicaStates().cversion;
+              int newCVersion =
+                  newState.getPerReplicaStates() == null
+                      ? -1
+                      : newState.getPerReplicaStates().cversion;
+              if (oldState.getZNodeVersion() < newState.getZNodeVersion()
+                  || oldCVersion < newCVersion) {
+                watch.currentState = newState;
+                if (log.isDebugEnabled()) {
+                  log.debug(
+                      "Updating data for [{}] from [{}] to [{}]",
+                      collection,
+                      oldState.getZNodeVersion(),
+                      newState.getZNodeVersion());
+                }
+              }
+            }
+            stateHasChanged.set(oldState != watch.currentState);
+            return watch;
+          });
+
+      return stateHasChanged.get();
+    }
+
+    /**
+     * Computes the new StatefulCollectionWatch by the supplied 
remappingFunction.

Review Comment:
   nit: add a `@see` javadoc pointing to Map.compute



##########
solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -525,8 +652,14 @@ private void constructState(Set<String> 
changedCollections) {
     Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>();
 
     // Add collections
-    for (Map.Entry<String, DocCollection> entry : 
watchedCollectionStates.entrySet()) {
-      result.put(entry.getKey(), new 
ClusterState.CollectionRef(entry.getValue()));
+    for (Entry<String, StatefulCollectionWatch> entry :
+        collectionWatches.watchedCollectionEntries()) {
+      if (entry.getValue().currentState != null) {
+        // if the doc is null for the collection watch, then it should not be 
inserted into the
+        // state
+        result.putIfAbsent(

Review Comment:
   do we need to check the result here? what does it mean if there was already 
an entry and this call fails?



##########
solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -326,12 +432,11 @@ public void forciblyRefreshAllClusterStateSlow() throws 
KeeperException, Interru
       // No need to set watchers because we should already have watchers 
registered for everything.
       refreshCollectionList(null);
       refreshLiveNodes(null);
-      // Need a copy so we don't delete from what we're iterating over.
-      Collection<String> safeCopy = new 
ArrayList<>(watchedCollectionStates.keySet());
+
       Set<String> updatedCollections = new HashSet<>();
-      for (String coll : safeCopy) {
+      for (String coll : collectionWatches.watchedCollections()) {
         DocCollection newState = fetchCollectionState(coll, null);

Review Comment:
   Please add this justification as a comment to the code.



##########
solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -246,6 +246,134 @@ public boolean canBeRemoved() {
     }
   }
 
+  /**
+   * A ConcurrentHashMap of active watcher by collection name
+   *
+   * <p>Each watcher DocCollectionWatch also contains the latest DocCollection 
(state) observed
+   */
+  private static class DocCollectionWatches {
+    private final ConcurrentHashMap<String, StatefulCollectionWatch>
+        statefulWatchesByCollectionName = new ConcurrentHashMap<>();
+
+    /**
+     * Gets the DocCollection (state) of the collection which the 
corresponding watch last observed
+     *
+     * @param collection the collection name to get DocCollection on
+     * @return The last observed DocCollection(state). if null, that means 
there's no such
+     *     collection.
+     */
+    private DocCollection getDocCollection(String collection) {
+      StatefulCollectionWatch watch = 
statefulWatchesByCollectionName.get(collection);
+      return watch != null ? watch.currentState : null;
+    }
+
+    /**
+     * Gets the active collections (collections that exist) being watched
+     *
+     * @return an immutable set of active collection names
+     */
+    private Set<String> activeCollections() {
+      return statefulWatchesByCollectionName.entrySet().stream()
+          .filter(
+              (Entry<String, StatefulCollectionWatch> entry) ->
+                  entry.getValue().currentState != null)
+          .map(Entry::getKey)
+          .collect(Collectors.toUnmodifiableSet());
+    }
+
+    /**
+     * Gets the count of active collections (collections that exist) being 
watched
+     *
+     * @return the count of active collections
+     */
+    private long activeCollectionCount() {
+      return statefulWatchesByCollectionName.entrySet().stream()
+          .filter(
+              (Entry<String, StatefulCollectionWatch> entry) ->
+                  entry.getValue().currentState != null)
+          .count();
+    }
+
+    private Set<String> watchedCollections() {
+      return 
Collections.unmodifiableSet(statefulWatchesByCollectionName.keySet());
+    }
+
+    private Set<Entry<String, StatefulCollectionWatch>> 
watchedCollectionEntries() {
+      return 
Collections.unmodifiableSet(statefulWatchesByCollectionName.entrySet());
+    }
+
+    /**
+     * Updates the latest observed DocCollection (state) of the {@link 
StatefulCollectionWatch} if
+     * the collection is being watched
+     *
+     * @param collection the collection name
+     * @param newState the new DocCollection (state) observed
+     * @return whether the state has changed for the watched collection
+     */
+    private boolean updateDocCollection(String collection, DocCollection 
newState) {
+      AtomicBoolean stateHasChanged = new AtomicBoolean(false);
+      statefulWatchesByCollectionName.computeIfPresent(
+          collection,
+          (col, watch) -> {
+            DocCollection oldState = watch.currentState;
+            if (oldState == null && newState == null) {
+              // OK, the collection not yet exist in ZK or already deleted
+            } else if (oldState == null) {
+              if (log.isDebugEnabled()) {
+                log.debug("Add data for [{}] ver [{}]", collection, 
newState.getZNodeVersion());
+              }
+              watch.currentState = newState;
+            } else if (newState == null) {
+              log.debug("Removing cached collection state for [{}]", 
collection);
+              watch.currentState = null;
+            } else { // both new and old states are non-null
+              int oldCVersion =
+                  oldState.getPerReplicaStates() == null
+                      ? -1
+                      : oldState.getPerReplicaStates().cversion;
+              int newCVersion =
+                  newState.getPerReplicaStates() == null
+                      ? -1
+                      : newState.getPerReplicaStates().cversion;
+              if (oldState.getZNodeVersion() < newState.getZNodeVersion()
+                  || oldCVersion < newCVersion) {

Review Comment:
   Can we add a test that creates c1, gets the ref, deletes c1, creates c1 
again, and then checks that the new ref is correct? The node version for both 
instances of c1 would be 0, but the cversion should be larger, so I think your 
logic still works, but I'd like to see a unit test for it. Or maybe this is 
already covered and I missed where it is in the tests? Thanks! Similar to what 
we saw in SOLR-16143.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org
For additional commands, e-mail: issues-h...@solr.apache.org

Reply via email to