This is an automated email from the ASF dual-hosted git repository.
houston pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 981f6789ebf SOLR-16116: Fix various issues with Curator (#2855)
981f6789ebf is described below
commit 981f6789ebfb527636972dbba6a7d3cfa74355cf
Author: Houston Putman <[email protected]>
AuthorDate: Mon Nov 11 15:45:32 2024 -0600
SOLR-16116: Fix various issues with Curator (#2855)
---
.../java/org/apache/solr/cloud/ZkController.java | 75 ++++++++++++++++------
.../java/org/apache/solr/cloud/ZkShardTerms.java | 14 ++--
.../apache/solr/cloud/DistributedQueueTest.java | 8 ++-
.../apache/solr/common/cloud/BeforeReconnect.java | 32 ---------
.../org/apache/solr/common/cloud/OnDisconnect.java | 6 +-
.../org/apache/solr/common/cloud/OnReconnect.java | 9 +--
.../org/apache/solr/common/cloud/SolrZkClient.java | 14 ++--
7 files changed, 81 insertions(+), 77 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 59480caa8e9..5cb313f717a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -368,7 +368,8 @@ public class ZkController implements Closeable {
.withTimeout(clientTimeout, TimeUnit.MILLISECONDS)
.withConnTimeOut(zkClientConnectTimeout, TimeUnit.MILLISECONDS)
.withReconnectListener(() -> onReconnect(descriptorsSupplier))
- .withBeforeConnect(() -> beforeReconnect(descriptorsSupplier))
+ .withDisconnectListener(
+ (sessionExpired) -> onDisconnect(descriptorsSupplier,
sessionExpired))
.withAclProvider(zkACLProvider)
.withClosedCheck(cc::isShutDown)
.withCompressor(compressor)
@@ -404,13 +405,14 @@ public class ZkController implements Closeable {
assert ObjectReleaseTracker.track(this);
}
- private void beforeReconnect(Supplier<List<CoreDescriptor>>
descriptorsSupplier) {
+ private void onDisconnect(
+ Supplier<List<CoreDescriptor>> descriptorsSupplier, boolean
sessionExpired) {
try {
overseer.close();
} catch (Exception e) {
log.error("Error trying to stop any Overseer threads", e);
}
- closeOutstandingElections(descriptorsSupplier);
+ closeOutstandingElections(descriptorsSupplier, sessionExpired);
markAllAsNotLeader(descriptorsSupplier);
}
@@ -419,6 +421,8 @@ public class ZkController implements Closeable {
log.info("ZooKeeper session re-connected ... refreshing core states after
session expiration.");
clearZkCollectionTerms();
try {
+ // Remove the live node in case it is still there
+ removeEphemeralLiveNode();
// recreate our watchers first so that they exist even on any problems
below
zkStateReader.createClusterStateWatchersAndUpdate();
@@ -663,16 +667,17 @@ public class ZkController implements Closeable {
return sysPropsCacher;
}
- private void closeOutstandingElections(final Supplier<List<CoreDescriptor>>
registerOnReconnect) {
+ private void closeOutstandingElections(
+ final Supplier<List<CoreDescriptor>> registerOnReconnect, boolean
sessionExpired) {
List<CoreDescriptor> descriptors = registerOnReconnect.get();
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
- closeExistingElectionContext(descriptor);
+ closeExistingElectionContext(descriptor, sessionExpired);
}
}
}
- private ContextKey closeExistingElectionContext(CoreDescriptor cd) {
+ private ContextKey closeExistingElectionContext(CoreDescriptor cd, boolean
sessionExpired) {
// look for old context - if we find it, cancel it
String collection = cd.getCloudDescriptor().getCollectionName();
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
@@ -682,7 +687,11 @@ public class ZkController implements Closeable {
if (prevContext != null) {
prevContext.close();
- electionContexts.remove(contextKey);
+ // Only remove the election contexts if the session expired, otherwise
the ephemeral nodes
+ // will still exist
+ if (sessionExpired) {
+ electionContexts.remove(contextKey);
+ }
}
return contextKey;
@@ -1209,6 +1218,19 @@ public class ZkController implements Closeable {
} catch (NoNodeException e) {
}
+ cc.nodeRoles
+ .getRoles()
+ .forEach(
+ (role, mode) -> {
+ try {
+ zkClient.delete(
+ NodeRoles.getZNodeForRoleMode(role, mode) + "/" +
nodeName, -1, true);
+ } catch (KeeperException e) {
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
}
public String getNodeName() {
@@ -1740,6 +1762,14 @@ public class ZkController implements Closeable {
cd.getCloudDescriptor().setLastPublished(state);
}
DocCollection coll = zkStateReader.getCollection(collection);
+ // extra handling for PRS, we need to write the PRS entries from this
node directly,
+ // as overseer does not and should not handle those entries
+ if (coll != null && coll.isPerReplicaState() && coreNodeName != null) {
+ PerReplicaStates perReplicaStates =
+ PerReplicaStatesOps.fetch(coll.getZNode(), zkClient,
coll.getPerReplicaStates());
+ PerReplicaStatesOps.flipState(coreNodeName, state, perReplicaStates)
+ .persist(coll.getZNode(), zkClient);
+ }
if (forcePublish || updateStateDotJson(coll, coreNodeName)) {
if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
distributedClusterStateUpdater.doSingleStateUpdate(
@@ -1751,14 +1781,6 @@ public class ZkController implements Closeable {
overseerJobQueue.offer(m);
}
}
- // extra handling for PRS, we need to write the PRS entries from this
node directly,
- // as overseer does not and should not handle those entries
- if (coll != null && coll.isPerReplicaState() && coreNodeName != null) {
- PerReplicaStates perReplicaStates =
- PerReplicaStatesOps.fetch(coll.getZNode(), zkClient,
coll.getPerReplicaStates());
- PerReplicaStatesOps.flipState(coreNodeName, state, perReplicaStates)
- .persist(coll.getZNode(), zkClient);
- }
} finally {
MDCLoggingContext.clear();
}
@@ -1944,7 +1966,7 @@ public class ZkController implements Closeable {
// before becoming available, make sure we are not live and active
// this also gets us our assigned shard id if it was not specified
try {
- checkStateInZk(cd);
+ checkStateInZk(cd, null);
CloudDescriptor cloudDesc = cd.getCloudDescriptor();
@@ -2001,7 +2023,7 @@ public class ZkController implements Closeable {
return !replica.getNodeName().equals(getNodeName());
}
- private void checkStateInZk(CoreDescriptor cd)
+ private void checkStateInZk(CoreDescriptor cd, Replica.State state)
throws InterruptedException, NotInClusterStateException {
CloudDescriptor cloudDesc = cd.getCloudDescriptor();
String nodeName = cloudDesc.getCoreNodeName();
@@ -2037,6 +2059,16 @@ public class ZkController implements Closeable {
+ ", ignore the exception if the replica was deleted");
return false;
}
+ if (state != null && !state.equals(replica.getState())) {
+ errorMessage.set(
+ "coreNodeName "
+ + coreNodeName
+ + " does not have the expected state "
+ + state
+ + ", found state was: "
+ + replica.getState());
+ return false;
+ }
return true;
});
} catch (TimeoutException e) {
@@ -2120,10 +2152,15 @@ public class ZkController implements Closeable {
if (!isLeader && !SKIP_AUTO_RECOVERY) {
if (!getShardTerms(collection, shard).canBecomeLeader(myCoreNodeName)) {
log.debug(
- "Term of replica {} is already less than leader, so not waiting
for leader to see down state.",
+ "Term of replica {} is already less than leader, so not waiting
for leader to see down state."
+ + " Instead, make sure we can see the down state in
Zookeeper.",
myCoreNodeName);
+ try {
+ checkStateInZk(descriptor, Replica.State.DOWN);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
} else {
-
if (log.isInfoEnabled()) {
log.info(
"replica={} is making a best effort attempt to wait for
leader={} to see it's DOWN state.",
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 83f8885325b..e58e0e5ec32 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -406,21 +406,21 @@ public class ZkShardTerms implements AutoCloseable {
Watcher watcher =
event -> {
// Don't do anything if we are closed
- if (isClosed.get()) {
+ if (isClosed.get() ||
!event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
return;
}
// session events are not change events, and do not remove the
watcher
if (Watcher.Event.EventType.None == event.getType()) {
return;
}
+ // Some events may be missed during registering a watcher, so it is
safer to refresh terms
+ // after registering watcher
retryRegisterWatcher();
- // if term node is deleted, refresh cannot possibly succeed
- if (Watcher.Event.EventType.NodeDeleted == event.getType()) {
- return;
+ // Only refresh the data if the node was created or its data changed.
+ if (Watcher.Event.EventType.NodeCreated == event.getType()
+ || Watcher.Event.EventType.NodeDataChanged == event.getType()) {
+ refreshTerms();
}
- // Some events may be missed during register a watcher, so it is
safer to refresh terms
- // after registering watcher
- refreshTerms();
};
try {
// exists operation is faster than getData operation
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
index 213a5db6d6d..152a5742bd9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
@@ -295,7 +295,13 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
zkClient
.getCuratorFramework()
.getConnectionStateListenable()
- .addListener((OnDisconnect) hasDisconnected::countDown);
+ .addListener(
+ (OnDisconnect)
+ ((sessionExpired) -> {
+ if (sessionExpired) {
+ hasDisconnected.countDown();
+ }
+ }));
long sessionId = zkClient.getZkSessionId();
zkServer.expire(sessionId);
hasDisconnected.await(10, TimeUnit.SECONDS);
diff --git
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/BeforeReconnect.java
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/BeforeReconnect.java
deleted file mode 100644
index 6e028056013..00000000000
---
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/BeforeReconnect.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.common.cloud;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-
-public interface BeforeReconnect extends ConnectionStateListener {
- public void command();
-
- @Override
- default void stateChanged(CuratorFramework client, ConnectionState newState)
{
- if (newState == ConnectionState.LOST || newState ==
ConnectionState.SUSPENDED) {
- command();
- }
- }
-}
diff --git
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java
index f33013c5185..cd24a2ec4d1 100644
---
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java
+++
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java
@@ -21,12 +21,12 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
public interface OnDisconnect extends ConnectionStateListener {
- public void command();
+ public void command(boolean sessionExpired);
@Override
default void stateChanged(CuratorFramework client, ConnectionState newState)
{
- if (newState == ConnectionState.LOST) {
- command();
+ if (newState == ConnectionState.LOST || newState ==
ConnectionState.SUSPENDED) {
+ command(newState == ConnectionState.LOST);
}
}
}
diff --git
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java
index 1863d797938..906a8368c35 100644
---
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java
+++
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java
@@ -16,7 +16,6 @@
*/
package org.apache.solr.common.cloud;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
@@ -31,16 +30,10 @@ import
org.apache.curator.framework.state.ConnectionStateListener;
public interface OnReconnect extends ConnectionStateListener {
void command();
- AtomicBoolean sessionEnded = new AtomicBoolean(false);
-
@Override
default void stateChanged(CuratorFramework client, ConnectionState newState)
{
if (ConnectionState.RECONNECTED.equals(newState)) {
- if (sessionEnded.getAndSet(false)) {
- command();
- }
- } else if (ConnectionState.LOST == newState) {
- sessionEnded.set(true);
+ command();
}
}
}
diff --git
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 9ae720cb180..d8abea5529d 100644
---
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -120,7 +120,7 @@ public class SolrZkClient implements Closeable {
builder.zkCredentialsProvider,
builder.aclProvider,
builder.onReconnect,
- builder.beforeReconnect,
+ builder.onDisconnect,
builder.higherLevelIsClosed,
builder.minStateByteLenForCompression,
builder.compressor,
@@ -135,7 +135,7 @@ public class SolrZkClient implements Closeable {
ZkCredentialsProvider zkCredentialsProvider,
ACLProvider aclProvider,
final OnReconnect onReconnect,
- BeforeReconnect beforeReconnect,
+ OnDisconnect onDisconnect,
IsClosed higherLevelIsClosed,
int minStateByteLenForCompression,
Compressor compressor,
@@ -205,10 +205,10 @@ public class SolrZkClient implements Closeable {
.getConnectionStateListenable()
.addListener(onReconnect, zkConnectionListenerCallbackExecutor);
}
- if (beforeReconnect != null) {
+ if (onDisconnect != null) {
client
.getConnectionStateListenable()
- .addListener(beforeReconnect, zkConnectionListenerCallbackExecutor);
+ .addListener(onDisconnect, zkConnectionListenerCallbackExecutor);
}
client.start();
try {
@@ -1257,7 +1257,7 @@ public class SolrZkClient implements Closeable {
public int zkClientTimeout = SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT;
public int zkClientConnectTimeout =
SolrZkClientTimeout.DEFAULT_ZK_CONNECT_TIMEOUT;
public OnReconnect onReconnect;
- public BeforeReconnect beforeReconnect;
+ public OnDisconnect onDisconnect;
public ZkCredentialsProvider zkCredentialsProvider;
public ACLProvider aclProvider;
public IsClosed higherLevelIsClosed;
@@ -1313,8 +1313,8 @@ public class SolrZkClient implements Closeable {
return this;
}
- public Builder withBeforeConnect(BeforeReconnect beforeReconnect) {
- this.beforeReconnect = beforeReconnect;
+ public Builder withDisconnectListener(OnDisconnect onDisconnect) {
+ this.onDisconnect = onDisconnect;
return this;
}