This is an automated email from the ASF dual-hosted git repository.

magibney pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new f5f2ea90cc5 SOLR-11535: Fix race condition in singleton-per-collection 
StateWatcher creation (#1964)
f5f2ea90cc5 is described below

commit f5f2ea90cc5cfe40b30f62d25a330699fe1680ab
Author: Michael Gibney <[email protected]>
AuthorDate: Wed Apr 10 14:54:09 2024 -0400

    SOLR-11535: Fix race condition in singleton-per-collection StateWatcher 
creation (#1964)
    
    (cherry picked from commit 2b9d4c85dd58920df02317069805f4f5d3abb5c3)
---
 solr/CHANGES.txt                                   |  2 +
 .../apache/solr/cloud/ActiveReplicaWatcher.java    |  1 +
 .../java/org/apache/solr/cloud/ZkController.java   |  1 +
 .../solr/cloud/overseer/ZkStateReaderTest.java     | 81 ++++++++++++++++++++++
 .../apache/solr/common/cloud/ZkStateReader.java    | 60 +++++++++++-----
 5 files changed, 127 insertions(+), 18 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 0653d062861..5db04567e28 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -92,6 +92,8 @@ Bug Fixes
 
 * SOLR-17176: Fix log history V2 API serialization (Michael Gibney)
 
+* SOLR-11535: Fix race condition in singleton-per-collection StateWatcher 
creation (Michael Gibney)
+
 Dependency Upgrades
 ---------------------
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java 
b/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java
index 4fe3963b7cd..30cd1ba165d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java
@@ -119,6 +119,7 @@ public class ActiveReplicaWatcher implements 
CollectionStateWatcher {
   }
 
   // synchronized due to SOLR-11535
+  // TODO: can we remove `synchronized`, now that SOLR-11535 is fixed?
   @Override
   public synchronized boolean onStateChanged(Set<String> liveNodes, 
DocCollection collectionState) {
     if (log.isDebugEnabled()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java 
b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 20e23a624d9..09bd049bab4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -2827,6 +2827,7 @@ public class ZkController implements Closeable {
 
     @Override
     // synchronized due to SOLR-11535
+    // TODO: can we remove `synchronized`, now that SOLR-11535 is fixed?
     public synchronized boolean onStateChanged(DocCollection collectionState) {
       if (getCoreContainer().getCoreDescriptor(coreName) == null) return true;
 
diff --git 
a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java 
b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index 43c52d96a19..710fdc20e2e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -26,11 +26,16 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
 import org.apache.lucene.util.IOUtils;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.cloud.OverseerTest;
@@ -691,6 +696,82 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
     }
   }
 
+  /**
+   * Simulates race condition that can arise from the normal way in which the 
removal of collection
+   * StateWatchers is deferred.
+   *
+   * <p>StateWatchers are registered at the level of Zk code, so when 
StateWatchers are removed in
+   * Solr code, the actual removal is deferred until the next callback for the 
associated collection
+   * fires, at which point the removed watcher should allow itself to expire. 
If a watcher is
+   * re-added for the associated collection in the intervening time, only the 
most recently added
+   * watcher should re-register; the removed watcher should simply expire.
+   *
+   * <p>Duplicate/redundant StateWatchers should no longer be registered with 
the new code that
+   * tracks the currently registered singleton-per-collection watcher in Solr 
code, and only
+   * re-registers the currently active watcher, with all other watchers 
allowing themselves to
+   * expire.
+   */
+  public void testStateWatcherRaceCondition() throws Exception {
+    ZkStateWriter writer = fixture.writer;
+    final ZkStateReader reader = fixture.reader;
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+    int extraWatchers = 10;
+    int iterations = 10;
+    for (int i = 0; i < extraWatchers; i++) {
+      // add and remove a bunch of watchers
+      DocCollectionWatcher w = (coll) -> false;
+      try {
+        reader.registerDocCollectionWatcher("c1", w);
+      } finally {
+        reader.removeDocCollectionWatcher("c1", w);
+      }
+    }
+    final ConcurrentHashMap<Integer, LongAdder> invoked = new 
ConcurrentHashMap<>();
+    CyclicBarrier barrier = new CyclicBarrier(2);
+    reader.registerDocCollectionWatcher(
+        "c1",
+        (coll) -> {
+          // add a watcher that tracks how many times it's invoked per znode 
version
+          if (coll != null) {
+            try {
+              barrier.await(250, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException | TimeoutException | 
BrokenBarrierException e) {
+              throw new RuntimeException(e);
+            }
+            invoked.computeIfAbsent(coll.getZNodeVersion(), (k) -> new 
LongAdder()).increment();
+          }
+          return false;
+        });
+
+    ClusterState clusterState = reader.getClusterState();
+    int dataVersion = -1;
+    for (int i = 0; i < iterations; i++) {
+      // create or update collection
+      DocCollection state =
+          DocCollection.create(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, 
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              dataVersion,
+              Instant.now(),
+              PerReplicaStatesOps.getZkClientPrsSupplier(
+                  fixture.zkClient, DocCollection.getCollectionPath("c1")));
+      ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+      writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+      clusterState = writer.writePendingUpdates();
+      barrier.await(250, TimeUnit.MILLISECONDS); // wait for the watch 
callback to execute
+      fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1" + i, 
true);
+      dataVersion = clusterState.getCollectionOrNull("c1").getZNodeVersion();
+    }
+    // expect to have been invoked for each iteration ...
+    assertEquals(iterations, invoked.size());
+    // ... and only _once_ for each iteration
+    assertTrue(
+        "wrong number of watchers (expected 1): " + invoked,
+        invoked.values().stream().mapToLong(LongAdder::sum).allMatch((l) -> l 
== 1));
+  }
+
   /**
    * Ensure that collection state fetching (getCollectionLive etc.) would not 
throw exception when
    * the state.json is deleted in between the state.json read and PRS entries 
read
diff --git 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index ffb19c3a32f..c57fcb03400 100644
--- 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -370,6 +370,22 @@ public class ZkStateReader implements SolrCloseable {
 
   private static class StatefulCollectionWatch extends 
CollectionWatch<DocCollectionWatcher> {
     private DocCollection currentState;
+
+    /**
+     * The {@link StateWatcher} that is associated with this {@link 
StatefulCollectionWatch}. It is
+     * necessary to track this because of the way {@link StateWatcher} 
instances expire
+     * asynchronously: once registered with ZooKeeper, a {@link StateWatcher} 
cannot be removed, and
+     * its {@link StateWatcher#process(WatchedEvent)} method will be invoked 
upon node update.
+     * Because it is not possible to synchronously remove the {@link 
StateWatcher} as part of a
+     * transaction with {@link ZkStateReader#collectionWatches}, we keep track 
of a unique {@link
+     * StateWatcher} here, so that all other {@link StateWatcher}s may 
properly expire in a deferred
+     * way.
+     */
+    private volatile StateWatcher associatedWatcher;
+
+    private StatefulCollectionWatch(StateWatcher associatedWatcher) {
+      this.associatedWatcher = associatedWatcher;
+    }
   }
 
   public static final Set<String> KNOWN_CLUSTER_PROPS =
@@ -649,8 +665,10 @@ public class ZkStateReader implements SolrCloseable {
 
   /** Refresh collections. */
   private void refreshCollections() {
-    for (String coll : collectionWatches.watchedCollections()) {
-      new StateWatcher(coll).refreshAndWatch();
+    for (Entry<String, StatefulCollectionWatch> e : 
collectionWatches.watchedCollectionEntries()) {
+      StateWatcher newStateWatcher = new StateWatcher(e.getKey());
+      e.getValue().associatedWatcher = newStateWatcher;
+      newStateWatcher.refreshAndWatch();
     }
   }
 
@@ -1334,8 +1352,9 @@ public class ZkStateReader implements SolrCloseable {
         return;
       }
 
-      if (!collectionWatches.watchedCollections().contains(coll)) {
-        // This collection is no longer interesting, stop watching.
+      StatefulCollectionWatch scw = 
collectionWatches.statefulWatchesByCollectionName.get(coll);
+      if (scw == null || scw.associatedWatcher != this) {
+        // Collection no longer interesting, or we have been replaced by a 
different watcher.
         log.debug("Uninteresting collection {}", coll);
         return;
       }
@@ -1677,19 +1696,21 @@ public class ZkStateReader implements SolrCloseable {
    * @see ZkStateReader#unregisterCore(String)
    */
   public void registerCore(String collection) {
-    AtomicBoolean reconstructState = new AtomicBoolean(false);
+    AtomicReference<StateWatcher> newWatcherRef = new AtomicReference<>();
     collectionWatches.compute(
         collection,
         (k, v) -> {
           if (v == null) {
-            reconstructState.set(true);
-            v = new StatefulCollectionWatch();
+            StateWatcher stateWatcher = new StateWatcher(collection);
+            newWatcherRef.set(stateWatcher);
+            v = new StatefulCollectionWatch(stateWatcher);
           }
           v.coreRefCount++;
           return v;
         });
-    if (reconstructState.get()) {
-      new StateWatcher(collection).refreshAndWatch();
+    StateWatcher newWatcher = newWatcherRef.get();
+    if (newWatcher != null) {
+      newWatcher.refreshAndWatch();
     }
   }
 
@@ -1761,26 +1782,29 @@ public class ZkStateReader implements SolrCloseable {
    * <p>The Watcher will automatically be removed when it's 
<code>onStateChanged</code> returns
    * <code>true</code>
    */
-  public void registerDocCollectionWatcher(String collection, 
DocCollectionWatcher stateWatcher) {
-    AtomicBoolean watchSet = new AtomicBoolean(false);
+  public void registerDocCollectionWatcher(
+      String collection, DocCollectionWatcher docCollectionWatcher) {
+    AtomicReference<StateWatcher> newWatcherRef = new AtomicReference<>();
     collectionWatches.compute(
         collection,
         (k, v) -> {
           if (v == null) {
-            v = new StatefulCollectionWatch();
-            watchSet.set(true);
+            StateWatcher stateWatcher = new StateWatcher(collection);
+            newWatcherRef.set(stateWatcher);
+            v = new StatefulCollectionWatch(stateWatcher);
           }
-          v.stateWatchers.add(stateWatcher);
+          v.stateWatchers.add(docCollectionWatcher);
           return v;
         });
 
-    if (watchSet.get()) {
-      new StateWatcher(collection).refreshAndWatch();
+    StateWatcher newWatcher = newWatcherRef.get();
+    if (newWatcher != null) {
+      newWatcher.refreshAndWatch();
     }
 
     DocCollection state = clusterState.getCollectionOrNull(collection);
-    if (stateWatcher.onStateChanged(state) == true) {
-      removeDocCollectionWatcher(collection, stateWatcher);
+    if (docCollectionWatcher.onStateChanged(state) == true) {
+      removeDocCollectionWatcher(collection, docCollectionWatcher);
     }
   }
 

Reply via email to