This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 291f9b3 SOLR-15288: Hardening NODEDOWN event event in PRS mode
291f9b3 is described below
commit 291f9b32c779f471fcd9f04648e017efbd9b90a8
Author: Noble Paul <[email protected]>
AuthorDate: Mon Apr 5 23:27:31 2021 +1000
SOLR-15288: Hardening NODEDOWN event event in PRS mode
---
.../solr/cloud/DistributedClusterStateUpdater.java | 2 +-
.../src/java/org/apache/solr/cloud/Overseer.java | 2 +-
.../apache/solr/cloud/overseer/NodeMutator.java | 18 ++-
.../org/apache/solr/cloud/NodeMutatorTest.java | 2 +-
.../test/org/apache/solr/cloud/OverseerTest.java | 2 +-
.../client/solrj/impl/CloudSolrClientTest.java | 43 ------
.../cloud/PerReplicaStatesIntegrationTest.java | 158 +++++++++++++++++++++
7 files changed, 177 insertions(+), 50 deletions(-)
diff --git
a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
index e48b7ce..9583e5b 100644
---
a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
+++
b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
@@ -795,7 +795,7 @@ public class DistributedClusterStateUpdater {
@Override
public void computeUpdates(ClusterState clusterState) {
final DocCollection docCollection =
clusterState.getCollectionOrNull(collectionName);
- Optional<ZkWriteCommand> result = docCollection != null ?
NodeMutator.computeCollectionUpdate(nodeName, collectionName, docCollection) :
Optional.empty();
+ Optional<ZkWriteCommand> result = docCollection != null ?
NodeMutator.computeCollectionUpdate(nodeName, collectionName, docCollection,
null) : Optional.empty();
if (docCollection == null) {
// This is possible but should be rare. Logging warn in case it is
seen often and likely a sign of another issue
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index e131d8c..47fa0cc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -508,7 +508,7 @@ public class Overseer implements SolrCloseable {
}
break;
case DOWNNODE:
- return new NodeMutator().downNode(clusterState, message);
+ return new
NodeMutator(getSolrCloudManager()).downNode(clusterState, message);
default:
throw new RuntimeException("unknown operation:" + operation + "
contents:" + message.getProperties());
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index adf146d..a54eac9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -25,11 +25,14 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.slf4j.Logger;
@@ -38,6 +41,10 @@ import org.slf4j.LoggerFactory;
public class NodeMutator {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ protected SolrZkClient zkClient;
+ public NodeMutator(SolrCloudManager cloudManager) {
+ zkClient = SliceMutator.getZkClient(cloudManager);
+ }
public List<ZkWriteCommand> downNode(ClusterState clusterState, ZkNodeProps
message) {
String nodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
@@ -51,7 +58,7 @@ public class NodeMutator {
String collectionName = entry.getKey();
DocCollection docCollection = entry.getValue();
- Optional<ZkWriteCommand> zkWriteCommand =
computeCollectionUpdate(nodeName, collectionName, docCollection);
+ Optional<ZkWriteCommand> zkWriteCommand =
computeCollectionUpdate(nodeName, collectionName, docCollection, zkClient);
if (zkWriteCommand.isPresent()) {
zkWriteCommands.add(zkWriteCommand.get());
@@ -67,7 +74,7 @@ public class NodeMutator {
* The returned write command might be for per replica state updates or
for an update to state.json, depending on the
* configuration of the collection.
*/
- public static Optional<ZkWriteCommand> computeCollectionUpdate(String
nodeName, String collectionName, DocCollection docCollection) {
+ public static Optional<ZkWriteCommand> computeCollectionUpdate(String
nodeName, String collectionName, DocCollection docCollection, SolrZkClient
client) {
boolean needToUpdateCollection = false;
List<String> downedReplicas = new ArrayList<>();
Map<String,Slice> slicesCopy = new
LinkedHashMap<>(docCollection.getSlicesMap());
@@ -99,8 +106,13 @@ public class NodeMutator {
if (needToUpdateCollection) {
if (docCollection.isPerReplicaState()) {
+ PerReplicaStates prs = client == null ?
+ docCollection.getPerReplicaStates() :
+ PerReplicaStates.fetch(docCollection.getZNode(), client,
docCollection.getPerReplicaStates());
+
return Optional.of(new ZkWriteCommand(collectionName,
docCollection.copyWithSlices(slicesCopy),
- PerReplicaStatesOps.downReplicas(downedReplicas,
docCollection.getPerReplicaStates()), false));
+ PerReplicaStatesOps.downReplicas(downedReplicas,
+ prs), false));
} else {
return Optional.of(new ZkWriteCommand(collectionName,
docCollection.copyWithSlices(slicesCopy)));
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
index e9401d0..8fa70d8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
@@ -43,7 +43,7 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
@Test
public void downNodeReportsAllImpactedCollectionsAndNothingElse() throws
IOException {
- NodeMutator nm = new NodeMutator();
+ NodeMutator nm = new NodeMutator(null);
//Collection1: 2 shards X 1 replica = replica1 on node1 and replica2 on
node2
//Collection2: 1 shard X 1 replica = replica1 on node2
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index a7c8e48..c95b72f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -532,7 +532,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerAction.DOWNNODE.toLower(),
ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr");
- List<ZkWriteCommand> commands = new
NodeMutator().downNode(reader.getClusterState(), m);
+ List<ZkWriteCommand> commands = new
NodeMutator(null).downNode(reader.getClusterState(), m);
ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
diff --git
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index bc27bac..e6f1d10 100644
---
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -47,7 +47,6 @@ import
org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.client.solrj.response.SolrPingResponse;
@@ -61,10 +60,8 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
@@ -84,8 +81,6 @@ import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
-
/**
* This test would be faster if we simulated the zk state instead.
@@ -1056,42 +1051,4 @@ public class CloudSolrClientTest extends
SolrCloudTestCase {
assertEquals("This should be OK", 0, response.getStatus());
}
- public void testPerReplicaStateCollection() throws Exception {
- CollectionAdminRequest.createCollection("versions_collection", "conf", 2,
1)
- .process(cluster.getSolrClient());
-
- String testCollection = "perReplicaState_test";
- int liveNodes = cluster.getJettySolrRunners().size();
- CollectionAdminRequest.createCollection(testCollection, "conf", 2, 2)
- .setPerReplicaState(Boolean.TRUE)
- .process(cluster.getSolrClient());
- cluster.waitForActiveCollection(testCollection, 2, 4);
- final SolrClient clientUnderTest = getRandomClient();
- final SolrPingResponse response = clientUnderTest.ping(testCollection);
- assertEquals("This should be OK", 0, response.getStatus());
- DocCollection c =
cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
- c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
- PerReplicaStates prs =
PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection),
cluster.getZkClient(), null);
- assertEquals(4, prs.states.size());
-
- // Now let's do an add replica
- CollectionAdminRequest
- .addReplicaToShard(testCollection, "shard1")
- .process(cluster.getSolrClient());
- prs =
PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection),
cluster.getZkClient(), null);
- assertEquals(5, prs.states.size());
-
- testCollection = "perReplicaState_testv2";
- new V2Request.Builder("/collections")
- .withMethod(POST)
- .withPayload("{create: {name: perReplicaState_testv2, config : conf,
numShards : 2, nrtReplicas : 2, perReplicaState : true}}")
- .build()
- .process(cluster.getSolrClient());
- cluster.waitForActiveCollection(testCollection, 2, 4);
- c =
cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
- c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
- prs =
PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection),
cluster.getZkClient(), null);
- assertEquals(4, prs.states.size());
- }
-
}
diff --git
a/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java
b/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java
new file mode 100644
index 0000000..94ae988
--- /dev/null
+++
b/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.response.SolrPingResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.util.LogLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
+
+
+/**
+ * This test would be faster if we simulated the zk state instead.
+ */
+@Slow
+@LogLevel("org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG;org.apache.solr.cloud.Overseer=INFO;org.apache.solr.common.cloud=INFO;org.apache.solr.cloud.api.collections=INFO;org.apache.solr.cloud.overseer=INFO")
+public class PerReplicaStatesIntegrationTest extends SolrCloudTestCase {
+
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+ public void testPerReplicaStateCollection() throws Exception {
+
+ String testCollection = "perReplicaState_test";
+
+ MiniSolrCloudCluster cluster =
+ configureCluster(3)
+ .addConfig("conf",
getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
+ .withJettyConfig(jetty -> jetty.enableV2(true))
+ .configure();
+ try {
+ int liveNodes = cluster.getJettySolrRunners().size();
+ CollectionAdminRequest.createCollection(testCollection, "conf", 2, 2)
+ .setPerReplicaState(Boolean.TRUE)
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(testCollection, 2, 4);
+ final SolrClient clientUnderTest = cluster.getSolrClient();
+ final SolrPingResponse response = clientUnderTest.ping(testCollection);
+ assertEquals("This should be OK", 0, response.getStatus());
+ DocCollection c =
cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
+ c.forEachReplica((s, replica) ->
assertNotNull(replica.getReplicaState()));
+ PerReplicaStates prs =
PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection),
cluster.getZkClient(), null);
+ assertEquals(4, prs.states.size());
+ JettySolrRunner jsr = cluster.startJettySolrRunner();
+ // Now let's do an add replica
+ CollectionAdminRequest
+ .addReplicaToShard(testCollection, "shard1")
+ .process(cluster.getSolrClient());
+ prs =
PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection),
cluster.getZkClient(), null);
+ assertEquals(5, prs.states.size());
+
+ testCollection = "perReplicaState_testv2";
+ new V2Request.Builder("/collections")
+ .withMethod(POST)
+ .withPayload("{create: {name: perReplicaState_testv2, config : conf,
numShards : 2, nrtReplicas : 2, perReplicaState : true, maxShardsPerNode : 5}}")
+ .build()
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(testCollection, 2, 4);
+ c =
cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
+ c.forEachReplica((s, replica) ->
assertNotNull(replica.getReplicaState()));
+ prs =
PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection),
cluster.getZkClient(), null);
+ assertEquals(4, prs.states.size());
+ }finally {
+ cluster.shutdown();
+ }
+
+
+ }
+
+ public void testRestart() throws Exception {
+ String testCollection = "prs_restart_test";
+ MiniSolrCloudCluster cluster =
+ configureCluster(1)
+ .addConfig("conf",
getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
+ .withJettyConfig(jetty -> jetty.enableV2(true))
+ .configure();
+ try {
+ CollectionAdminRequest.createCollection(testCollection, "conf", 1, 1)
+ .setPerReplicaState(Boolean.TRUE)
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(testCollection, 1, 1);
+
+ DocCollection c =
cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
+ c.forEachReplica((s, replica) ->
assertNotNull(replica.getReplicaState()));
+ String collectionPath = ZkStateReader.getCollectionPath(testCollection);
+ PerReplicaStates prs = PerReplicaStates.fetch(collectionPath,
SolrCloudTestCase.cluster.getZkClient(), null);
+ assertEquals(1, prs.states.size());
+
+ JettySolrRunner jsr = cluster.startJettySolrRunner();
+ assertEquals(2,cluster.getJettySolrRunners().size());
+
+ // Now let's do an add replica
+ CollectionAdminRequest
+ .addReplicaToShard(testCollection, "shard1")
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(testCollection, 1, 2);
+ prs = PerReplicaStates.fetch(collectionPath,
SolrCloudTestCase.cluster.getZkClient(), null);
+ assertEquals(2, prs.states.size());
+ c =
cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
+ prs.states.forEachEntry((s, state) -> assertEquals(Replica.State.ACTIVE,
state.state));
+
+ String replicaName = null;
+ for (Replica r : c.getSlice("shard1").getReplicas()) {
+ if(r.getNodeName() .equals(jsr.getNodeName())) {
+ replicaName = r.getName();
+ }
+ }
+
+ if(replicaName != null) {
+ if(log.isInfoEnabled()) {
+ log.info("restarting the node : {}, state.json v: {} downreplica
:{}", jsr.getNodeName(), c.getZNodeVersion(), replicaName);
+ }
+ jsr.stop();
+ c =
cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
+ if(log.isInfoEnabled()) {
+ log.info("after down node, state.json v: {}", c.getZNodeVersion());
+ }
+ prs = PerReplicaStates.fetch(collectionPath,
SolrCloudTestCase.cluster.getZkClient(), null);
+ PerReplicaStates.State st = prs.get(replicaName);
+ assertNotEquals(Replica.State.ACTIVE, st.state);
+ jsr.start();
+ cluster.waitForActiveCollection(testCollection, 1, 2);
+ prs = PerReplicaStates.fetch(collectionPath,
SolrCloudTestCase.cluster.getZkClient(), null);
+ prs.states.forEachEntry((s, state) ->
assertEquals(Replica.State.ACTIVE, state.state));
+ }
+
+ } finally {
+ cluster.shutdown();
+ }
+
+ }
+}