This is an automated email from the ASF dual-hosted git repository.

tflobbe pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new fbf0980dba4 SOLR-7609: Validate the version is present on follower 
replicas when adding documents (#1504)
fbf0980dba4 is described below

commit fbf0980dba4118c17f658f5a0b79f929799910c1
Author: Alex <[email protected]>
AuthorDate: Tue Apr 25 09:59:39 2023 -0700

    SOLR-7609: Validate the version is present on follower replicas when adding 
documents (#1504)
    
    * Add the version check on additions to fail in case we are not leader and 
version = 0. (to match delete flows)
    * Change error status from BAD_REQUEST to INVALID_STATE to allow for 
retries.
    * Remove a 'cmd' variable - this is just minor readability refactoring, I 
tried to avoid changing the code as much as possible
    * Update the ShardSplitTest to keep track of exceptions happening during 
the concurrent adds and deletes and fail if needed.
    *Fix bad null validation
---
 solr/CHANGES.txt                                   |  2 +
 .../processor/DistributedUpdateProcessor.java      | 41 ++++++++++---------
 .../processor/DistributedZkUpdateProcessor.java    | 28 ++++++-------
 .../solr/cloud/api/collections/ShardSplitTest.java | 46 ++++++++++++++--------
 .../test/org/apache/solr/search/TestRecovery.java  |  2 +-
 5 files changed, 63 insertions(+), 56 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 5e642d507fe..4f35b5c1fcd 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -103,6 +103,8 @@ Bug Fixes
 
 * SOLR-16755: bin/solr's '-noprompt' option no longer works for examples 
(hossman, janhoy, Houston Putman)
 
+* SOLR-7609: Internal update requests should fail back to the client in some 
edge cases for shard splits. Use HTTP status 510 so the client can retry the 
operation. (Alex Deparvu, David Smiley, Tomás Fernández Löbbe)
+
 Dependency Upgrades
 ---------------------
 * PR#1494: Upgrade forbiddenapis to 3.5 (Uwe Schindler)
diff --git 
a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
 
b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 636a2c59240..35ec2153fbd 100644
--- 
a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ 
b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -151,8 +151,6 @@ public class DistributedUpdateProcessor extends 
UpdateRequestProcessor {
   /** Number of times requests from leaders to followers can be retried */
   protected final int maxRetriesToFollowers = MAX_RETRIES_TO_FOLLOWERS_DEFAULT;
 
-  protected UpdateCommand updateCommand; // the current command this processor 
is working on.
-
   protected final Replica.Type replicaType;
 
   public DistributedUpdateProcessor(
@@ -331,7 +329,8 @@ public class DistributedUpdateProcessor extends 
UpdateRequestProcessor {
 
     boolean isReplayOrPeersync =
         (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 
0;
-    boolean leaderLogic = isLeader && !isReplayOrPeersync;
+    boolean leaderLogic =
+        leaderLogicWithVersionIntegrityCheck(isReplayOrPeersync, isLeader, 
versionOnUpdate);
     boolean forwardedFromCollection = 
cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null;
 
     VersionBucket bucket = vinfo.bucket(bucketHash);
@@ -824,8 +823,6 @@ public class DistributedUpdateProcessor extends 
UpdateRequestProcessor {
 
     assert TestInjection.injectFailUpdateRequests();
 
-    updateCommand = cmd;
-
     if (!cmd.isDeleteById()) {
       doDeleteByQuery(cmd);
     } else {
@@ -968,16 +965,13 @@ public class DistributedUpdateProcessor extends 
UpdateRequestProcessor {
 
     boolean isReplayOrPeersync =
         (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 
0;
-    boolean leaderLogic = isLeader && !isReplayOrPeersync;
-
-    if (!leaderLogic && versionOnUpdate == 0) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on 
update from leader");
-    }
+    boolean leaderLogic =
+        leaderLogicWithVersionIntegrityCheck(isReplayOrPeersync, isLeader, 
versionOnUpdate);
 
     vinfo.blockUpdates();
     try {
 
-      doLocalDeleteByQuery(cmd, versionOnUpdate, isReplayOrPeersync);
+      doLocalDeleteByQuery(cmd, versionOnUpdate, isReplayOrPeersync, 
leaderLogic);
 
       // since we don't know which documents were deleted, the easiest thing 
to do is to invalidate
       // all real-time caches (i.e. UpdateLog) which involves also getting a 
new version of the
@@ -999,10 +993,12 @@ public class DistributedUpdateProcessor extends 
UpdateRequestProcessor {
   }
 
   private void doLocalDeleteByQuery(
-      DeleteUpdateCommand cmd, long versionOnUpdate, boolean 
isReplayOrPeersync)
+      DeleteUpdateCommand cmd,
+      long versionOnUpdate,
+      boolean isReplayOrPeersync,
+      boolean leaderLogic)
       throws IOException {
     if (versionsStored) {
-      final boolean leaderLogic = isLeader && !isReplayOrPeersync;
       if (leaderLogic) {
         long version = vinfo.getNewClock();
         cmd.setVersion(-version);
@@ -1034,7 +1030,6 @@ public class DistributedUpdateProcessor extends 
UpdateRequestProcessor {
   // internal helper method to setup request by processors who use this class.
   // NOTE: not called by this class!
   void setupRequest(UpdateCommand cmd) {
-    updateCommand = cmd;
     isLeader = getNonZkLeaderAssumption(req);
   }
 
@@ -1073,13 +1068,10 @@ public class DistributedUpdateProcessor extends 
UpdateRequestProcessor {
 
     boolean isReplayOrPeersync =
         (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 
0;
-    boolean leaderLogic = isLeader && !isReplayOrPeersync;
+    boolean leaderLogic =
+        leaderLogicWithVersionIntegrityCheck(isReplayOrPeersync, isLeader, 
versionOnUpdate);
     boolean forwardedFromCollection = 
cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null;
 
-    if (!leaderLogic && versionOnUpdate == 0) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on 
update from leader");
-    }
-
     VersionBucket bucket = vinfo.bucket(bucketHash);
 
     vinfo.lockForUpdate();
@@ -1206,13 +1198,20 @@ public class DistributedUpdateProcessor extends 
UpdateRequestProcessor {
     }
   }
 
+  private static boolean leaderLogicWithVersionIntegrityCheck(
+      boolean isReplayOrPeersync, boolean isLeader, long versionOnUpdate) {
+    boolean leaderLogic = isLeader && !isReplayOrPeersync;
+    if (!leaderLogic && versionOnUpdate == 0) {
+      throw new SolrException(ErrorCode.INVALID_STATE, "missing _version_ on 
update from leader");
+    }
+    return leaderLogic;
+  }
+
   @Override
   public void processCommit(CommitUpdateCommand cmd) throws IOException {
 
     assert TestInjection.injectFailUpdateRequests();
 
-    updateCommand = cmd;
-
     // replica type can only be NRT in standalone mode
     // NRT replicas will always commit
     doLocalCommit(cmd);
diff --git 
a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
 
b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 843dbde6724..202eac239bd 100644
--- 
a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ 
b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -164,11 +164,9 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
       throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection 
+ " is read-only.");
     }
 
-    updateCommand = cmd;
-
     List<SolrCmdDistributor.Node> nodes = null;
     Replica leaderReplica = null;
-    zkCheck();
+    zkCheck(cmd);
     try {
       leaderReplica =
           zkController.getZkStateReader().getLeaderRetry(collection, 
cloudDesc.getShardId());
@@ -425,7 +423,7 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
 
   @Override
   protected void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
-    zkCheck();
+    zkCheck(cmd);
 
     // NONE: we are the first to receive this deleteByQuery
     //       - it must be forwarded to the leader of every shard
@@ -686,11 +684,10 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
 
   @Override
   void setupRequest(UpdateCommand cmd) {
-    updateCommand = cmd;
-    zkCheck();
+    zkCheck(cmd);
     if (cmd instanceof AddUpdateCommand) {
       AddUpdateCommand acmd = (AddUpdateCommand) cmd;
-      nodes = setupRequest(acmd.getIndexedIdStr(), 
acmd.getSolrInputDocument());
+      nodes = setupRequest(acmd.getIndexedIdStr(), 
acmd.getSolrInputDocument(), null, cmd);
     } else if (cmd instanceof DeleteUpdateCommand) {
       DeleteUpdateCommand dcmd = (DeleteUpdateCommand) cmd;
       nodes =
@@ -699,16 +696,13 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
               null,
               (null != dcmd.getRoute()
                   ? dcmd.getRoute()
-                  : req.getParams().get(ShardParams._ROUTE_)));
+                  : req.getParams().get(ShardParams._ROUTE_)),
+              cmd);
     }
   }
 
-  protected List<SolrCmdDistributor.Node> setupRequest(String id, 
SolrInputDocument doc) {
-    return setupRequest(id, doc, null);
-  }
-
   protected List<SolrCmdDistributor.Node> setupRequest(
-      String id, SolrInputDocument doc, String route) {
+      String id, SolrInputDocument doc, String route, UpdateCommand 
updateCommand) {
     // if we are in zk mode...
 
     assert TestInjection.injectUpdateRandomPause();
@@ -778,7 +772,7 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
         }
       }
 
-      doDefensiveChecks(phase);
+      doDefensiveChecks(phase, updateCommand);
 
       // if request is coming from another collection then we want it to be 
sent to all replicas
       // even if its phase is FROMLEADER
@@ -892,7 +886,7 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
   private List<SolrCmdDistributor.Node> getCollectionUrls(
       String collection, EnumSet<Replica.Type> types, boolean onlyLeaders) {
     final DocCollection docCollection = 
clusterState.getCollectionOrNull(collection);
-    if (collection == null || docCollection.getSlicesMap() == null) {
+    if (docCollection == null || docCollection.getSlicesMap() == null) {
       throw new ZooKeeperException(
           SolrException.ErrorCode.BAD_REQUEST, "Could not find collection in 
zk: " + clusterState);
     }
@@ -1145,7 +1139,7 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
     return nodes;
   }
 
-  private void doDefensiveChecks(DistribPhase phase) {
+  private void doDefensiveChecks(DistribPhase phase, UpdateCommand 
updateCommand) {
     boolean isReplayOrPeersync =
         (updateCommand.getFlags() & (UpdateCommand.REPLAY | 
UpdateCommand.PEER_SYNC)) != 0;
     if (isReplayOrPeersync) return;
@@ -1448,7 +1442,7 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
     }
   }
 
-  private void zkCheck() {
+  private void zkCheck(UpdateCommand updateCommand) {
 
     // Streaming updates can delay shutdown and cause big update reorderings 
(new streams can't be
     // initiated, but existing streams carry on).  This is why we check if the 
CC is shutdown.
diff --git 
a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java 
b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index 9eccd8eefc8..f252c23e901 100644
--- 
a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++ 
b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -30,6 +30,7 @@ import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -865,6 +866,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     }
     commit();
 
+    List<String> errors = new CopyOnWriteArrayList<>();
     Thread indexThread =
         new Thread(
             () -> {
@@ -879,21 +881,26 @@ public class ShardSplitTest extends 
BasicDistributedZkTest {
                   indexAndUpdateCount(
                       router, ranges, docCounts, String.valueOf(id), id, 
documentIds);
                   Thread.sleep(sleep);
-                  if (usually(random)) {
-                    String delId = String.valueOf(random.nextInt(id - 101 + 1) 
+ 101);
-                    if (deleted.contains(delId)) continue;
-                    try {
-                      deleteAndUpdateCount(router, ranges, docCounts, delId);
-                      deleted.add(delId);
-                      documentIds.remove(String.valueOf(delId));
-                    } catch (Exception e) {
-                      log.error("Exception while deleting docs", e);
-                    }
-                  }
                 } catch (Exception e) {
                   log.error("Exception while adding doc id = {}", id, e);
                   // do not select this id for deletion ever
                   deleted.add(String.valueOf(id));
+                  errors.add(e.getMessage());
+                }
+
+                if (usually(random)) {
+                  String delId = String.valueOf(random.nextInt(id - 101 + 1) + 
101);
+                  if (deleted.contains(delId)) {
+                    continue;
+                  }
+                  try {
+                    deleteAndUpdateCount(router, ranges, docCounts, delId);
+                    deleted.add(delId);
+                    documentIds.remove(delId);
+                  } catch (Exception e) {
+                    log.error("Exception while deleting doc id = {}", delId, 
e);
+                    errors.add(e.getMessage());
+                  }
                 }
               }
             });
@@ -924,6 +931,9 @@ public class ShardSplitTest extends BasicDistributedZkTest {
       }
     }
 
+    assertTrue(
+        "Errors present while concurrently adding and removing docs " + 
errors, errors.isEmpty());
+
     waitForRecoveriesToFinish(true);
     checkDocCountsAndShardStates(docCounts, numReplicas, documentIds);
   }
@@ -1342,17 +1352,19 @@ public class ShardSplitTest extends 
BasicDistributedZkTest {
     Map<String, SolrDocument> shard11Docs = new HashMap<>();
     for (int i = 0; i < response.getResults().size(); i++) {
       SolrDocument document = response.getResults().get(i);
-      idVsVersion.put(
-          document.getFieldValue("id").toString(), 
document.getFieldValue("_version_").toString());
-      SolrDocument old = 
shard10Docs.put(document.getFieldValue("id").toString(), document);
+      String id = document.getFieldValue("id").toString();
+      Object version = document.getFieldValue("_version_");
+      assertNotNull("doc " + id + " has null _version_ field", version);
+      idVsVersion.put(id, version.toString());
+      SolrDocument old = shard10Docs.put(id, document);
       if (old != null) {
         log.error(
             "EXTRA: ID: {} on shard1_0. Old version: {} new version: {}",
-            document.getFieldValue("id"),
+            id,
             old.getFieldValue("_version_"),
-            document.getFieldValue("_version_"));
+            version);
       }
-      found.add(document.getFieldValue("id").toString());
+      found.add(id);
     }
     for (int i = 0; i < response2.getResults().size(); i++) {
       SolrDocument document = response2.getResults().get(i);
diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java 
b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
index a868acbdadb..9fd46e00c58 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
@@ -1969,7 +1969,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
   }
 
   static class VersionProvider {
-    private static long version = 0;
+    private static long version = 1;
 
     static String getNextVersion() {
       return Long.toString(version++);

Reply via email to