This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 291f9b3  SOLR-15288: Hardening NODEDOWN event event in PRS mode
291f9b3 is described below

commit 291f9b32c779f471fcd9f04648e017efbd9b90a8
Author: Noble Paul <[email protected]>
AuthorDate: Mon Apr 5 23:27:31 2021 +1000

    SOLR-15288: Hardening NODEDOWN event event in PRS mode
---
 .../solr/cloud/DistributedClusterStateUpdater.java |   2 +-
 .../src/java/org/apache/solr/cloud/Overseer.java   |   2 +-
 .../apache/solr/cloud/overseer/NodeMutator.java    |  18 ++-
 .../org/apache/solr/cloud/NodeMutatorTest.java     |   2 +-
 .../test/org/apache/solr/cloud/OverseerTest.java   |   2 +-
 .../client/solrj/impl/CloudSolrClientTest.java     |  43 ------
 .../cloud/PerReplicaStatesIntegrationTest.java     | 158 +++++++++++++++++++++
 7 files changed, 177 insertions(+), 50 deletions(-)

diff --git 
a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java 
b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
index e48b7ce..9583e5b 100644
--- 
a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
+++ 
b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
@@ -795,7 +795,7 @@ public class DistributedClusterStateUpdater {
     @Override
     public void computeUpdates(ClusterState clusterState) {
       final DocCollection docCollection = 
clusterState.getCollectionOrNull(collectionName);
-      Optional<ZkWriteCommand> result = docCollection != null ? 
NodeMutator.computeCollectionUpdate(nodeName, collectionName, docCollection) : 
Optional.empty();
+      Optional<ZkWriteCommand> result = docCollection != null ? 
NodeMutator.computeCollectionUpdate(nodeName, collectionName, docCollection, 
null) : Optional.empty();
 
       if (docCollection == null) {
         // This is possible but should be rare. Logging warn in case it is 
seen often and likely a sign of another issue
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java 
b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index e131d8c..47fa0cc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -508,7 +508,7 @@ public class Overseer implements SolrCloseable {
             }
             break;
           case DOWNNODE:
-            return new NodeMutator().downNode(clusterState, message);
+            return new 
NodeMutator(getSolrCloudManager()).downNode(clusterState, message);
           default:
             throw new RuntimeException("unknown operation:" + operation + " 
contents:" + message.getProperties());
         }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java 
b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index adf146d..a54eac9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -25,11 +25,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.PerReplicaStatesOps;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.slf4j.Logger;
@@ -38,6 +41,10 @@ import org.slf4j.LoggerFactory;
 public class NodeMutator {
 
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  protected SolrZkClient zkClient;
+  public NodeMutator(SolrCloudManager cloudManager) {
+    zkClient = SliceMutator.getZkClient(cloudManager);
+  }
 
   public List<ZkWriteCommand> downNode(ClusterState clusterState, ZkNodeProps 
message) {
     String nodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
@@ -51,7 +58,7 @@ public class NodeMutator {
       String collectionName = entry.getKey();
       DocCollection docCollection = entry.getValue();
 
-      Optional<ZkWriteCommand> zkWriteCommand = 
computeCollectionUpdate(nodeName, collectionName, docCollection);
+      Optional<ZkWriteCommand> zkWriteCommand = 
computeCollectionUpdate(nodeName, collectionName, docCollection, zkClient);
 
       if (zkWriteCommand.isPresent()) {
         zkWriteCommands.add(zkWriteCommand.get());
@@ -67,7 +74,7 @@ public class NodeMutator {
    *    The returned write command might be for per replica state updates or 
for an update to state.json, depending on the
    *    configuration of the collection.
    */
-  public static Optional<ZkWriteCommand> computeCollectionUpdate(String 
nodeName, String collectionName, DocCollection docCollection) {
+  public static Optional<ZkWriteCommand> computeCollectionUpdate(String 
nodeName, String collectionName, DocCollection docCollection, SolrZkClient 
client) {
     boolean needToUpdateCollection = false;
     List<String> downedReplicas = new ArrayList<>();
     Map<String,Slice> slicesCopy = new 
LinkedHashMap<>(docCollection.getSlicesMap());
@@ -99,8 +106,13 @@ public class NodeMutator {
 
     if (needToUpdateCollection) {
       if (docCollection.isPerReplicaState()) {
+        PerReplicaStates prs = client == null ?
+            docCollection.getPerReplicaStates() :
+            PerReplicaStates.fetch(docCollection.getZNode(), client, 
docCollection.getPerReplicaStates());
+
         return Optional.of(new ZkWriteCommand(collectionName, 
docCollection.copyWithSlices(slicesCopy),
-            PerReplicaStatesOps.downReplicas(downedReplicas, 
docCollection.getPerReplicaStates()), false));
+            PerReplicaStatesOps.downReplicas(downedReplicas,
+                prs), false));
       } else {
         return Optional.of(new ZkWriteCommand(collectionName, 
docCollection.copyWithSlices(slicesCopy)));
       }
diff --git a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java 
b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
index e9401d0..8fa70d8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
@@ -43,7 +43,7 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
 
   @Test
   public void downNodeReportsAllImpactedCollectionsAndNothingElse() throws 
IOException {
-    NodeMutator nm = new NodeMutator();
+    NodeMutator nm = new NodeMutator(null);
 
     //Collection1: 2 shards X 1 replica = replica1 on node1 and replica2 on 
node2
     //Collection2: 1 shard X 1 replica = replica1 on node2
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java 
b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index a7c8e48..c95b72f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -532,7 +532,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
         }
         ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, 
OverseerAction.DOWNNODE.toLower(),
             ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr");
-        List<ZkWriteCommand> commands = new 
NodeMutator().downNode(reader.getClusterState(), m);
+        List<ZkWriteCommand> commands = new 
NodeMutator(null).downNode(reader.getClusterState(), m);
 
         ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
 
diff --git 
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
 
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index bc27bac..e6f1d10 100644
--- 
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ 
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -47,7 +47,6 @@ import 
org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.request.V2Request;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.client.solrj.response.SolrPingResponse;
@@ -61,10 +60,8 @@ import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
@@ -84,8 +81,6 @@ import org.junit.rules.ExpectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
-
 
 /**
  * This test would be faster if we simulated the zk state instead.
@@ -1056,42 +1051,4 @@ public class CloudSolrClientTest extends 
SolrCloudTestCase {
     assertEquals("This should be OK", 0, response.getStatus());
   }
 
-  public void testPerReplicaStateCollection() throws Exception {
-    CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 
1)
-        .process(cluster.getSolrClient());
-
-    String testCollection = "perReplicaState_test";
-    int liveNodes = cluster.getJettySolrRunners().size();
-    CollectionAdminRequest.createCollection(testCollection, "conf", 2, 2)
-        .setPerReplicaState(Boolean.TRUE)
-        .process(cluster.getSolrClient());
-    cluster.waitForActiveCollection(testCollection, 2, 4);
-    final SolrClient clientUnderTest = getRandomClient();
-    final SolrPingResponse response = clientUnderTest.ping(testCollection);
-    assertEquals("This should be OK", 0, response.getStatus());
-    DocCollection c = 
cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
-    c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
-    PerReplicaStates prs = 
PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), 
cluster.getZkClient(), null);
-    assertEquals(4, prs.states.size());
-
-    // Now let's do an add replica
-    CollectionAdminRequest
-        .addReplicaToShard(testCollection, "shard1")
-        .process(cluster.getSolrClient());
-    prs = 
PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), 
cluster.getZkClient(), null);
-    assertEquals(5, prs.states.size());
-
-    testCollection = "perReplicaState_testv2";
-    new V2Request.Builder("/collections")
-        .withMethod(POST)
-        .withPayload("{create: {name: perReplicaState_testv2, config : conf, 
numShards : 2, nrtReplicas : 2, perReplicaState : true}}")
-        .build()
-        .process(cluster.getSolrClient());
-    cluster.waitForActiveCollection(testCollection, 2, 4);
-    c = 
cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
-    c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
-    prs = 
PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), 
cluster.getZkClient(), null);
-    assertEquals(4, prs.states.size());
-  }
-
 }
diff --git 
a/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java
 
b/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java
new file mode 100644
index 0000000..94ae988
--- /dev/null
+++ 
b/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.response.SolrPingResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.util.LogLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
+
+
+/**
+ * This test would be faster if we simulated the zk state instead.
+ */
+@Slow
+@LogLevel("org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG;org.apache.solr.cloud.Overseer=INFO;org.apache.solr.common.cloud=INFO;org.apache.solr.cloud.api.collections=INFO;org.apache.solr.cloud.overseer=INFO")
+public class PerReplicaStatesIntegrationTest extends SolrCloudTestCase {
+
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+  public void testPerReplicaStateCollection() throws Exception {
+
+    String testCollection = "perReplicaState_test";
+
+    MiniSolrCloudCluster cluster =
+        configureCluster(3)
+            .addConfig("conf", 
getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
+            .withJettyConfig(jetty -> jetty.enableV2(true))
+            .configure();
+    try {
+      int liveNodes = cluster.getJettySolrRunners().size();
+      CollectionAdminRequest.createCollection(testCollection, "conf", 2, 2)
+          .setPerReplicaState(Boolean.TRUE)
+          .process(cluster.getSolrClient());
+      cluster.waitForActiveCollection(testCollection, 2, 4);
+      final SolrClient clientUnderTest = cluster.getSolrClient();
+      final SolrPingResponse response = clientUnderTest.ping(testCollection);
+      assertEquals("This should be OK", 0, response.getStatus());
+      DocCollection c = 
cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
+      c.forEachReplica((s, replica) -> 
assertNotNull(replica.getReplicaState()));
+      PerReplicaStates prs = 
PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), 
cluster.getZkClient(), null);
+      assertEquals(4, prs.states.size());
+      JettySolrRunner jsr = cluster.startJettySolrRunner();
+      // Now let's do an add replica
+      CollectionAdminRequest
+          .addReplicaToShard(testCollection, "shard1")
+          .process(cluster.getSolrClient());
+      prs = 
PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), 
cluster.getZkClient(), null);
+      assertEquals(5, prs.states.size());
+
+      testCollection = "perReplicaState_testv2";
+      new V2Request.Builder("/collections")
+          .withMethod(POST)
+          .withPayload("{create: {name: perReplicaState_testv2, config : conf, 
numShards : 2, nrtReplicas : 2, perReplicaState : true, maxShardsPerNode : 5}}")
+          .build()
+          .process(cluster.getSolrClient());
+      cluster.waitForActiveCollection(testCollection, 2, 4);
+      c = 
cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
+      c.forEachReplica((s, replica) -> 
assertNotNull(replica.getReplicaState()));
+      prs = 
PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), 
cluster.getZkClient(), null);
+      assertEquals(4, prs.states.size());
+    }finally {
+      cluster.shutdown();
+    }
+
+
+  }
+
+  public void testRestart() throws Exception {
+    String testCollection = "prs_restart_test";
+    MiniSolrCloudCluster cluster =
+        configureCluster(1)
+            .addConfig("conf", 
getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
+            .withJettyConfig(jetty -> jetty.enableV2(true))
+            .configure();
+    try {
+      CollectionAdminRequest.createCollection(testCollection, "conf", 1, 1)
+          .setPerReplicaState(Boolean.TRUE)
+          .process(cluster.getSolrClient());
+      cluster.waitForActiveCollection(testCollection, 1, 1);
+
+      DocCollection c = 
cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
+      c.forEachReplica((s, replica) -> 
assertNotNull(replica.getReplicaState()));
+      String collectionPath = ZkStateReader.getCollectionPath(testCollection);
+      PerReplicaStates prs = PerReplicaStates.fetch(collectionPath, 
SolrCloudTestCase.cluster.getZkClient(), null);
+      assertEquals(1, prs.states.size());
+
+      JettySolrRunner jsr = cluster.startJettySolrRunner();
+      assertEquals(2,cluster.getJettySolrRunners().size());
+
+      // Now let's do an add replica
+      CollectionAdminRequest
+          .addReplicaToShard(testCollection, "shard1")
+          .process(cluster.getSolrClient());
+      cluster.waitForActiveCollection(testCollection, 1, 2);
+      prs = PerReplicaStates.fetch(collectionPath, 
SolrCloudTestCase.cluster.getZkClient(), null);
+      assertEquals(2, prs.states.size());
+      c = 
cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
+      prs.states.forEachEntry((s, state) -> assertEquals(Replica.State.ACTIVE, 
state.state));
+
+      String replicaName = null;
+      for (Replica r : c.getSlice("shard1").getReplicas()) {
+        if(r.getNodeName() .equals(jsr.getNodeName())) {
+          replicaName = r.getName();
+        }
+      }
+
+      if(replicaName != null) {
+        if(log.isInfoEnabled()) {
+          log.info("restarting the node : {}, state.json v: {} downreplica 
:{}", jsr.getNodeName(), c.getZNodeVersion(), replicaName);
+        }
+        jsr.stop();
+        c = 
cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
+        if(log.isInfoEnabled()) {
+          log.info("after down node, state.json v: {}", c.getZNodeVersion());
+        }
+        prs =  PerReplicaStates.fetch(collectionPath, 
SolrCloudTestCase.cluster.getZkClient(), null);
+        PerReplicaStates.State st = prs.get(replicaName);
+        assertNotEquals(Replica.State.ACTIVE, st.state);
+        jsr.start();
+        cluster.waitForActiveCollection(testCollection, 1, 2);
+        prs =  PerReplicaStates.fetch(collectionPath, 
SolrCloudTestCase.cluster.getZkClient(), null);
+        prs.states.forEachEntry((s, state) -> 
assertEquals(Replica.State.ACTIVE, state.state));
+      }
+
+    } finally {
+      cluster.shutdown();
+    }
+
+  }
+}

Reply via email to