This is an automated email from the ASF dual-hosted git repository.

houston pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 981f6789ebf SOLR-16116: Fix various issues with Curator (#2855)
981f6789ebf is described below

commit 981f6789ebfb527636972dbba6a7d3cfa74355cf
Author: Houston Putman <[email protected]>
AuthorDate: Mon Nov 11 15:45:32 2024 -0600

    SOLR-16116: Fix various issues with Curator (#2855)
---
 .../java/org/apache/solr/cloud/ZkController.java   | 75 ++++++++++++++++------
 .../java/org/apache/solr/cloud/ZkShardTerms.java   | 14 ++--
 .../apache/solr/cloud/DistributedQueueTest.java    |  8 ++-
 .../apache/solr/common/cloud/BeforeReconnect.java  | 32 ---------
 .../org/apache/solr/common/cloud/OnDisconnect.java |  6 +-
 .../org/apache/solr/common/cloud/OnReconnect.java  |  9 +--
 .../org/apache/solr/common/cloud/SolrZkClient.java | 14 ++--
 7 files changed, 81 insertions(+), 77 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java 
b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 59480caa8e9..5cb313f717a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -368,7 +368,8 @@ public class ZkController implements Closeable {
             .withTimeout(clientTimeout, TimeUnit.MILLISECONDS)
             .withConnTimeOut(zkClientConnectTimeout, TimeUnit.MILLISECONDS)
             .withReconnectListener(() -> onReconnect(descriptorsSupplier))
-            .withBeforeConnect(() -> beforeReconnect(descriptorsSupplier))
+            .withDisconnectListener(
+                (sessionExpired) -> onDisconnect(descriptorsSupplier, 
sessionExpired))
             .withAclProvider(zkACLProvider)
             .withClosedCheck(cc::isShutDown)
             .withCompressor(compressor)
@@ -404,13 +405,14 @@ public class ZkController implements Closeable {
     assert ObjectReleaseTracker.track(this);
   }
 
-  private void beforeReconnect(Supplier<List<CoreDescriptor>> 
descriptorsSupplier) {
+  private void onDisconnect(
+      Supplier<List<CoreDescriptor>> descriptorsSupplier, boolean 
sessionExpired) {
     try {
       overseer.close();
     } catch (Exception e) {
       log.error("Error trying to stop any Overseer threads", e);
     }
-    closeOutstandingElections(descriptorsSupplier);
+    closeOutstandingElections(descriptorsSupplier, sessionExpired);
     markAllAsNotLeader(descriptorsSupplier);
   }
 
@@ -419,6 +421,8 @@ public class ZkController implements Closeable {
     log.info("ZooKeeper session re-connected ... refreshing core states after 
session expiration.");
     clearZkCollectionTerms();
     try {
+      // Remove the live node in case it is still there
+      removeEphemeralLiveNode();
       // recreate our watchers first so that they exist even on any problems 
below
       zkStateReader.createClusterStateWatchersAndUpdate();
 
@@ -663,16 +667,17 @@ public class ZkController implements Closeable {
     return sysPropsCacher;
   }
 
-  private void closeOutstandingElections(final Supplier<List<CoreDescriptor>> 
registerOnReconnect) {
+  private void closeOutstandingElections(
+      final Supplier<List<CoreDescriptor>> registerOnReconnect, boolean 
sessionExpired) {
     List<CoreDescriptor> descriptors = registerOnReconnect.get();
     if (descriptors != null) {
       for (CoreDescriptor descriptor : descriptors) {
-        closeExistingElectionContext(descriptor);
+        closeExistingElectionContext(descriptor, sessionExpired);
       }
     }
   }
 
-  private ContextKey closeExistingElectionContext(CoreDescriptor cd) {
+  private ContextKey closeExistingElectionContext(CoreDescriptor cd, boolean 
sessionExpired) {
     // look for old context - if we find it, cancel it
     String collection = cd.getCloudDescriptor().getCollectionName();
     final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
@@ -682,7 +687,11 @@ public class ZkController implements Closeable {
 
     if (prevContext != null) {
       prevContext.close();
-      electionContexts.remove(contextKey);
+      // Only remove the election contexts if the session expired, otherwise 
the ephemeral nodes
+      // will still exist
+      if (sessionExpired) {
+        electionContexts.remove(contextKey);
+      }
     }
 
     return contextKey;
@@ -1209,6 +1218,19 @@ public class ZkController implements Closeable {
     } catch (NoNodeException e) {
 
     }
+    cc.nodeRoles
+        .getRoles()
+        .forEach(
+            (role, mode) -> {
+              try {
+                zkClient.delete(
+                    NodeRoles.getZNodeForRoleMode(role, mode) + "/" + 
nodeName, -1, true);
+              } catch (KeeperException e) {
+
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+              }
+            });
   }
 
   public String getNodeName() {
@@ -1740,6 +1762,14 @@ public class ZkController implements Closeable {
         cd.getCloudDescriptor().setLastPublished(state);
       }
       DocCollection coll = zkStateReader.getCollection(collection);
+      // extra handling for PRS, we need to write the PRS entries from this 
node directly,
+      // as overseer does not and should not handle those entries
+      if (coll != null && coll.isPerReplicaState() && coreNodeName != null) {
+        PerReplicaStates perReplicaStates =
+            PerReplicaStatesOps.fetch(coll.getZNode(), zkClient, 
coll.getPerReplicaStates());
+        PerReplicaStatesOps.flipState(coreNodeName, state, perReplicaStates)
+            .persist(coll.getZNode(), zkClient);
+      }
       if (forcePublish || updateStateDotJson(coll, coreNodeName)) {
         if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
           distributedClusterStateUpdater.doSingleStateUpdate(
@@ -1751,14 +1781,6 @@ public class ZkController implements Closeable {
           overseerJobQueue.offer(m);
         }
       }
-      // extra handling for PRS, we need to write the PRS entries from this 
node directly,
-      // as overseer does not and should not handle those entries
-      if (coll != null && coll.isPerReplicaState() && coreNodeName != null) {
-        PerReplicaStates perReplicaStates =
-            PerReplicaStatesOps.fetch(coll.getZNode(), zkClient, 
coll.getPerReplicaStates());
-        PerReplicaStatesOps.flipState(coreNodeName, state, perReplicaStates)
-            .persist(coll.getZNode(), zkClient);
-      }
     } finally {
       MDCLoggingContext.clear();
     }
@@ -1944,7 +1966,7 @@ public class ZkController implements Closeable {
     // before becoming available, make sure we are not live and active
     // this also gets us our assigned shard id if it was not specified
     try {
-      checkStateInZk(cd);
+      checkStateInZk(cd, null);
 
       CloudDescriptor cloudDesc = cd.getCloudDescriptor();
 
@@ -2001,7 +2023,7 @@ public class ZkController implements Closeable {
     return !replica.getNodeName().equals(getNodeName());
   }
 
-  private void checkStateInZk(CoreDescriptor cd)
+  private void checkStateInZk(CoreDescriptor cd, Replica.State state)
       throws InterruptedException, NotInClusterStateException {
     CloudDescriptor cloudDesc = cd.getCloudDescriptor();
     String nodeName = cloudDesc.getCoreNodeName();
@@ -2037,6 +2059,16 @@ public class ZkController implements Closeable {
                       + ", ignore the exception if the replica was deleted");
               return false;
             }
+            if (state != null && !state.equals(replica.getState())) {
+              errorMessage.set(
+                  "coreNodeName "
+                      + coreNodeName
+                      + " does not have the expected state "
+                      + state
+                      + ", found state was: "
+                      + replica.getState());
+              return false;
+            }
             return true;
           });
     } catch (TimeoutException e) {
@@ -2120,10 +2152,15 @@ public class ZkController implements Closeable {
     if (!isLeader && !SKIP_AUTO_RECOVERY) {
       if (!getShardTerms(collection, shard).canBecomeLeader(myCoreNodeName)) {
         log.debug(
-            "Term of replica {} is already less than leader, so not waiting 
for leader to see down state.",
+            "Term of replica {} is already less than leader, so not waiting 
for leader to see down state."
+                + " Instead, make sure we can see the down state in 
Zookeeper.",
             myCoreNodeName);
+        try {
+          checkStateInZk(descriptor, Replica.State.DOWN);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
       } else {
-
         if (log.isInfoEnabled()) {
           log.info(
               "replica={} is making a best effort attempt to wait for 
leader={} to see it's DOWN state.",
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java 
b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 83f8885325b..e58e0e5ec32 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -406,21 +406,21 @@ public class ZkShardTerms implements AutoCloseable {
     Watcher watcher =
         event -> {
           // Don't do anything if we are closed
-          if (isClosed.get()) {
+          if (isClosed.get() || 
!event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
             return;
           }
           // session events are not change events, and do not remove the 
watcher
           if (Watcher.Event.EventType.None == event.getType()) {
             return;
           }
+          // Some events may be missed during registering a watcher, so it is 
safer to refresh terms
+          // after registering watcher
           retryRegisterWatcher();
-          // if term node is deleted, refresh cannot possibly succeed
-          if (Watcher.Event.EventType.NodeDeleted == event.getType()) {
-            return;
+          // Only refresh the data if the node was created or its data changed.
+          if (Watcher.Event.EventType.NodeCreated == event.getType()
+              || Watcher.Event.EventType.NodeDataChanged == event.getType()) {
+            refreshTerms();
           }
-          // Some events may be missed during register a watcher, so it is 
safer to refresh terms
-          // after registering watcher
-          refreshTerms();
         };
     try {
       // exists operation is faster than getData operation
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java 
b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
index 213a5db6d6d..152a5742bd9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
@@ -295,7 +295,13 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
     zkClient
         .getCuratorFramework()
         .getConnectionStateListenable()
-        .addListener((OnDisconnect) hasDisconnected::countDown);
+        .addListener(
+            (OnDisconnect)
+                ((sessionExpired) -> {
+                  if (sessionExpired) {
+                    hasDisconnected.countDown();
+                  }
+                }));
     long sessionId = zkClient.getZkSessionId();
     zkServer.expire(sessionId);
     hasDisconnected.await(10, TimeUnit.SECONDS);
diff --git 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/BeforeReconnect.java
 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/BeforeReconnect.java
deleted file mode 100644
index 6e028056013..00000000000
--- 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/BeforeReconnect.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.common.cloud;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-
-public interface BeforeReconnect extends ConnectionStateListener {
-  public void command();
-
-  @Override
-  default void stateChanged(CuratorFramework client, ConnectionState newState) 
{
-    if (newState == ConnectionState.LOST || newState == 
ConnectionState.SUSPENDED) {
-      command();
-    }
-  }
-}
diff --git 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java
index f33013c5185..cd24a2ec4d1 100644
--- 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java
+++ 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java
@@ -21,12 +21,12 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 
 public interface OnDisconnect extends ConnectionStateListener {
-  public void command();
+  public void command(boolean sessionExpired);
 
   @Override
   default void stateChanged(CuratorFramework client, ConnectionState newState) 
{
-    if (newState == ConnectionState.LOST) {
-      command();
+    if (newState == ConnectionState.LOST || newState == 
ConnectionState.SUSPENDED) {
+      command(newState == ConnectionState.LOST);
     }
   }
 }
diff --git 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java
index 1863d797938..906a8368c35 100644
--- 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java
+++ 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java
@@ -16,7 +16,6 @@
  */
 package org.apache.solr.common.cloud;
 
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
@@ -31,16 +30,10 @@ import 
org.apache.curator.framework.state.ConnectionStateListener;
 public interface OnReconnect extends ConnectionStateListener {
   void command();
 
-  AtomicBoolean sessionEnded = new AtomicBoolean(false);
-
   @Override
   default void stateChanged(CuratorFramework client, ConnectionState newState) 
{
     if (ConnectionState.RECONNECTED.equals(newState)) {
-      if (sessionEnded.getAndSet(false)) {
-        command();
-      }
-    } else if (ConnectionState.LOST == newState) {
-      sessionEnded.set(true);
+      command();
     }
   }
 }
diff --git 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 9ae720cb180..d8abea5529d 100644
--- 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -120,7 +120,7 @@ public class SolrZkClient implements Closeable {
         builder.zkCredentialsProvider,
         builder.aclProvider,
         builder.onReconnect,
-        builder.beforeReconnect,
+        builder.onDisconnect,
         builder.higherLevelIsClosed,
         builder.minStateByteLenForCompression,
         builder.compressor,
@@ -135,7 +135,7 @@ public class SolrZkClient implements Closeable {
       ZkCredentialsProvider zkCredentialsProvider,
       ACLProvider aclProvider,
       final OnReconnect onReconnect,
-      BeforeReconnect beforeReconnect,
+      OnDisconnect onDisconnect,
       IsClosed higherLevelIsClosed,
       int minStateByteLenForCompression,
       Compressor compressor,
@@ -205,10 +205,10 @@ public class SolrZkClient implements Closeable {
           .getConnectionStateListenable()
           .addListener(onReconnect, zkConnectionListenerCallbackExecutor);
     }
-    if (beforeReconnect != null) {
+    if (onDisconnect != null) {
       client
           .getConnectionStateListenable()
-          .addListener(beforeReconnect, zkConnectionListenerCallbackExecutor);
+          .addListener(onDisconnect, zkConnectionListenerCallbackExecutor);
     }
     client.start();
     try {
@@ -1257,7 +1257,7 @@ public class SolrZkClient implements Closeable {
     public int zkClientTimeout = SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT;
     public int zkClientConnectTimeout = 
SolrZkClientTimeout.DEFAULT_ZK_CONNECT_TIMEOUT;
     public OnReconnect onReconnect;
-    public BeforeReconnect beforeReconnect;
+    public OnDisconnect onDisconnect;
     public ZkCredentialsProvider zkCredentialsProvider;
     public ACLProvider aclProvider;
     public IsClosed higherLevelIsClosed;
@@ -1313,8 +1313,8 @@ public class SolrZkClient implements Closeable {
       return this;
     }
 
-    public Builder withBeforeConnect(BeforeReconnect beforeReconnect) {
-      this.beforeReconnect = beforeReconnect;
+    public Builder withDisconnectListener(OnDisconnect onDisconnect) {
+      this.onDisconnect = onDisconnect;
       return this;
     }
 

Reply via email to