HoustonPutman commented on code in PR #909:
URL: https://github.com/apache/solr/pull/909#discussion_r903111897


##########
solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -525,8 +606,14 @@ private void constructState(Set<String> 
changedCollections) {
     Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>();
 
     // Add collections
-    for (Map.Entry<String, DocCollection> entry : 
watchedCollectionStates.entrySet()) {
-      result.put(entry.getKey(), new 
ClusterState.CollectionRef(entry.getValue()));
+    for (Map.Entry<String, DocCollectionWatch<DocCollectionWatcher>> entry :
+        collectionWatches.entrySet()) {
+      if (entry.getValue().currentDoc
+          != null) { // if the doc is null for the collection watch, then it 
should not be inserted

Review Comment:
   Can you put the comment on the next line? That way the formatting of the if 
statement is preserved.



##########
solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java:
##########
@@ -16,28 +16,32 @@
  */
 package org.apache.solr.cloud.overseer;
 
+import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;

Review Comment:
   This might be an IDE thing, but please don't compress these. (Also happened 
below)



##########
solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -246,6 +244,90 @@ public boolean canBeRemoved() {
     }
   }
 
+  /**
+   * A ConcurrentHashMap of active watcher by collection name
+   *
+   * <p>Each watcher DocCollectionWatch also contains the latest DocCollection 
(state) observed
+   */
+  private static class DocCollectionWatches
+      extends ConcurrentHashMap<String, 
DocCollectionWatch<DocCollectionWatcher>> {
+
+    /**
+     * Gets the DocCollection (state) of the collection which the 
corresponding watch last observed
+     *
+     * @param collection the collection name to get DocCollection on
+     * @return The last observed DocCollection(state). if null, that means 
there's no such
+     *     collection.
+     */
+    private DocCollection getDocCollection(String collection) {
+      DocCollectionWatch<DocCollectionWatcher> watch = get(collection);
+      return watch != null ? watch.currentDoc : null;
+    }
+
+    /**
+     * Gets the active collections (collections that exist) being watched
+     *
+     * @return an immutable set of active collection names
+     */
+    private Set<String> activeCollections() {
+      return this.entrySet().stream()
+          .filter(
+              (Entry<String, DocCollectionWatch<DocCollectionWatcher>> entry) 
->
+                  entry.getValue().currentDoc != null)
+          .map(Entry::getKey)
+          .collect(Collectors.toUnmodifiableSet());
+    }
+
+    /**
+     * Updates the latest observed DocCollection (state) of the {@link 
DocCollectionWatch} if the
+     * collection is being watched
+     *
+     * @param collection the collection name
+     * @param newState the new DocCollection (state) observed
+     * @return whether an active watch exists for such collection
+     */
+    private boolean updateDocCollection(String collection, DocCollection 
newState) {
+      DocCollectionWatch<DocCollectionWatcher> watch = get(collection);
+      if (watch != null) {
+        DocCollection oldState = watch.currentDoc;
+        if (oldState == null && newState == null) {
+          // OK, the collection not yet exist in ZK
+        } else if (oldState == null) {
+          if (log.isDebugEnabled()) {
+            log.debug("Add data for [{}] ver [{}]", collection, 
newState.getZNodeVersion());
+          }
+          watch.currentDoc = newState;
+        } else if (newState == null) {
+          log.debug("Removing cached collection state for [{}]", collection);
+          watch.currentDoc = null;
+        } else { // both new and old states are non-null
+          int oldCVersion =
+              oldState.getPerReplicaStates() == null ? -1 : 
oldState.getPerReplicaStates().cversion;
+          int newCVersion =
+              newState.getPerReplicaStates() == null ? -1 : 
newState.getPerReplicaStates().cversion;
+          if (oldState.getZNodeVersion() < newState.getZNodeVersion()
+              || oldCVersion < newCVersion) {
+            watch.currentDoc = newState;
+            if (log.isDebugEnabled()) {
+              log.debug(
+                  "Updating data for [{}] from [{}] to [{}]",
+                  collection,
+                  oldState.getZNodeVersion(),
+                  newState.getZNodeVersion());
+            }
+          }
+        }
+        return true;
+      } else {
+        return false;
+      }
+    }

Review Comment:
   It looks like this is where the race condition could occur. There's no 
guarantee that `updateDocCollection()` is not called at the same time for the 
same collection. However, it looks like this race condition could still occur.
   
   ```suggestion
       DocCollectionWatch<DocCollectionWatcher> finalWatch = 
computeIfPresent(collection, (col, watch) -> {
         if (watch != null) {
           DocCollection oldState = watch.currentDoc;
           if (oldState == null && newState == null) {
             // OK, the collection not yet exist in ZK
           } else if (oldState == null) {
             if (log.isDebugEnabled()) {
               log.debug("Add data for [{}] ver [{}]", collection, 
newState.getZNodeVersion());
             }
             watch.currentDoc = newState;
           } else if (newState == null) {
             log.debug("Removing cached collection state for [{}]", collection);
             watch.currentDoc = null;
           } else { // both new and old states are non-null
             int oldCVersion =
                 oldState.getPerReplicaStates() == null ? -1 : 
oldState.getPerReplicaStates().cversion;
             int newCVersion =
                 newState.getPerReplicaStates() == null ? -1 : 
newState.getPerReplicaStates().cversion;
             if (oldState.getZNodeVersion() < newState.getZNodeVersion()
                 || oldCVersion < newCVersion) {
               watch.currentDoc = newState;
               if (log.isDebugEnabled()) {
                 log.debug(
                     "Updating data for [{}] from [{}] to [{}]",
                     collection,
                     oldState.getZNodeVersion(),
                     newState.getZNodeVersion());
               }
             }
           }
         }
         return watch;
       });
       return finalWatch != null;
   ```
   
   Also I'm not sure that we need to be creating a new class for this, we can 
likely just keep a ConcurrentHashMap variable, and have the helper methods 
outside. But let's focus first on the logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org
For additional commands, e-mail: issues-h...@solr.apache.org

Reply via email to