This is an automated email from the ASF dual-hosted git repository.

broustant pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new cdeda0d  EncryptionRequestHandler supports encryption requests 
distribution. (#115)
cdeda0d is described below

commit cdeda0dadb19d269e5854aaa1502f660ae53b5ed
Author: Bruno Roustant <[email protected]>
AuthorDate: Mon Feb 10 09:42:34 2025 +0100

    EncryptionRequestHandler supports encryption requests distribution. (#115)
---
 encryption/build.gradle                            |  16 +-
 .../solr/encryption/EncryptionRequestHandler.java  | 268 ++++++++++++++++++---
 .../encryption/crypto/CharStreamEncrypter.java     |   3 +
 .../kms/KmsEncryptionRequestHandler.java           |  70 +++---
 .../solr/encryption/EncryptionDirectoryTest.java   | 112 +++++----
 .../solr/encryption/EncryptionHeavyLoadTest.java   |  31 +--
 .../encryption/EncryptionRequestHandlerTest.java   |  59 ++++-
 .../apache/solr/encryption/EncryptionTestUtil.java |  70 +++++-
 .../solr/encryption/EncryptionUpdateLogTest.java   |  13 +-
 .../TestingEncryptionRequestHandler.java           |  62 +++++
 .../kms/KmsEncryptionRequestHandlerTest.java       |  34 +--
 11 files changed, 568 insertions(+), 170 deletions(-)

diff --git a/encryption/build.gradle b/encryption/build.gradle
index 336862a..3eadcf4 100644
--- a/encryption/build.gradle
+++ b/encryption/build.gradle
@@ -33,22 +33,22 @@ sourceSets {
 }
 
 dependencies {
-    implementation 'org.apache.solr:solr-core:9.6.0'
-    implementation 'org.apache.lucene:lucene-core:9.10.0'
+    implementation 'org.apache.solr:solr-core:9.8.0'
+    implementation 'org.apache.lucene:lucene-core:9.11.1'
     implementation 'com.google.code.findbugs:jsr305:3.0.2'
 
-    // Optional, used by the KmsKeySupplier.
-    implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8'
+    // Optional, used by the KmsKeySupplier example.
+    implementation 'com.github.ben-manes.caffeine:caffeine:3.2.0'
     implementation 'io.opentracing:opentracing-util:0.33.0'
 
     // Optional, commons-io and commons-codec are only required by the
     // tool class CharStreamEncrypter, which is not used for the index
     // encryption.
-    implementation 'commons-io:commons-io:2.11.0'
-    implementation 'commons-codec:commons-codec:1.16.0'
+    implementation 'commons-io:commons-io:2.18.0'
+    implementation 'commons-codec:commons-codec:1.18.0'
 
-    testImplementation 'org.apache.solr:solr-test-framework:9.6.0'
-    testImplementation 'org.apache.lucene:lucene-test-framework:9.10.0'
+    testImplementation 'org.apache.solr:solr-test-framework:9.8.0'
+    testImplementation 'org.apache.lucene:lucene-test-framework:9.11.1'
 }
 
 test {
diff --git 
a/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java
 
b/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java
index 3941388..830a253 100644
--- 
a/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java
+++ 
b/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java
@@ -19,8 +19,18 @@ package org.apache.solr.encryption;
 import org.apache.lucene.index.SegmentCommitInfo;
 import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.store.Directory;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.request.GenericSolrRequest;
+import org.apache.solr.client.solrj.response.SimpleSolrResponse;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.RequestHandlerBase;
@@ -31,19 +41,29 @@ import org.apache.solr.security.PermissionNameProvider;
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.UpdateHandler;
 import org.apache.solr.update.UpdateLog;
+import org.apache.solr.util.TimeOut;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
+import static org.apache.solr.common.params.CommonParams.TIME_ALLOWED;
 import static org.apache.solr.encryption.CommitUtil.readLatestCommit;
 import static org.apache.solr.encryption.EncryptionUtil.*;
 
@@ -55,21 +75,30 @@ import static org.apache.solr.encryption.EncryptionUtil.*;
  * value {@link #NO_KEY_ID} must be provided.
  * <p>
  * The encryption processing is asynchronous. The request returns immediately 
with two response
- * parameters. {@link #ENCRYPTION_STATE} parameter with values {@link 
#STATE_PENDING},
- * {@link #STATE_COMPLETE}, or {@link #STATE_BUSY}. And {@link #STATUS} 
parameter with values
+ * parameters. {@link #ENCRYPTION_STATE} parameter with values {@link 
State#PENDING},
+ * {@link State#COMPLETE}, or {@link State#BUSY}. And {@link #STATUS} 
parameter with values
  * {@link #STATUS_SUCCESS} or {@link #STATUS_FAILURE}.
  * <p>
  * The expected usage of this handler is to first send an encryption request 
with a key id, and
- * receive a response with {@link #STATUS_SUCCESS} and a {@link 
#STATE_PENDING}. If the caller needs
+ * receive a response with {@link #STATUS_SUCCESS} and a {@link 
State#PENDING}. If the caller needs
  * to know when the encryption is complete, it can (optionally) repeatedly 
send the same encryption
  * request with the same key id, until it receives a response with {@link 
#STATUS_SUCCESS} and a
- * {@link #STATE_COMPLETE}.
+ * {@link State#COMPLETE}.
  * <p>
- * If the handler returns a response with {@link #STATE_BUSY}, it means that 
another encryption for a
+ * If the handler returns a response with {@link State#BUSY}, it means that 
another encryption for a
  * different key id is ongoing on the same Solr core. It cannot start a new 
encryption until it finishes.
  * <p>
  * If the handler returns a response with {@link #STATUS_FAILURE}, it means 
the request did not succeed
  * and should be retried by the caller (there should be error logs).
+ * <p>
+ * The caller can provide the additional parameter {@link 
org.apache.solr.common.params.CommonParams#DISTRIB} with
+ * value "true". In this case, this handler will distribute the encryption 
request to all the leader replicas of
+ * all the shards of the collection, ensuring they all encrypt their index 
shard. The response {@link #ENCRYPTION_STATE}
+ * will be {@link State#COMPLETE} only when all the shards return {@link 
State#COMPLETE}. So the caller may repeatedly
+ * send the same encryption request with {@link 
org.apache.solr.common.params.CommonParams#DISTRIB} "true"
+ * to know when the whole collection encryption is complete. In addition, the 
caller can set the
+ * {@link org.apache.solr.common.params.CommonParams#TIME_ALLOWED} parameter 
to define a timeout for the request
+ * distribution, in milliseconds. This timeout is for the distribution itself, 
not the encryption process.
  */
 public class EncryptionRequestHandler extends RequestHandlerBase {
 
@@ -112,18 +141,62 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
    */
   public static final String ENCRYPTION_STATE = "encryptionState";
   /**
-   * One of {@link #ENCRYPTION_STATE} values: the encryption with the provided 
key id is ongoing and pending.
-   */
-  public static final String STATE_PENDING = "pending";
-  /**
-   * One of {@link #ENCRYPTION_STATE} values: the encryption with the provided 
key id is complete.
+   * Enumeration of the {@link #ENCRYPTION_STATE} values.
    */
-  public static final String STATE_COMPLETE = "complete";
-  /**
-   * One of {@link #ENCRYPTION_STATE} values: another encryption for a 
different key id is ongoing
-   * on the same Solr core; cannot start a new encryption until it finishes.
-   */
-  public static final String STATE_BUSY = "busy";
+  public enum State {
+    /** The encryption with the provided key id is complete. */
+    COMPLETE("complete", 0),
+    /** The request distribution timed out (can only be returned when {@link 
org.apache.solr.common.params.CommonParams#DISTRIB} is set). */
+    TIMEOUT("timeout", 1),
+    /** The encryption with the provided key id is ongoing and pending. */
+    PENDING("pending", 2),
+    /** Another encryption for a different key id is ongoing on the same Solr 
core; cannot start a new encryption until it finishes. */
+    BUSY("busy", 3),
+    /** At least one distributed encryption request failed for a shard (can 
only be returned when {@link 
org.apache.solr.common.params.CommonParams#DISTRIB} is set). */
+    ERROR("error", 4),
+    /** The request distribution was interrupted (can only be returned when 
{@link org.apache.solr.common.params.CommonParams#DISTRIB} is set). */
+    INTERRUPTED("interrupted", 5);
+
+    public final String value;
+    private final int priority;
+
+    State(String value, int priority) {
+      this.value = value;
+      this.priority = priority;
+    }
+
+    public static State fromValue(String value) {
+      switch (value) {
+        case "complete":
+          return COMPLETE;
+        case "timeout":
+          return TIMEOUT;
+        case "pending":
+          return PENDING;
+        case "busy":
+          return BUSY;
+        case "error":
+          return ERROR;
+        case "interrupted":
+          return INTERRUPTED;
+        default:
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Unsupported encryption state '" + value + "'");
+      }
+    }
+
+    public boolean isSuccess() {
+      switch (this) {
+        case COMPLETE:
+        case PENDING:
+        case BUSY:
+          return true;
+        default:
+          return false;
+      }
+    }
+  }
+
+  private static final long DISTRIBUTION_MAX_ATTEMPTS = 3;
 
   private static final Object pendingEncryptionLock = new Object();
   private static final Map<String, PendingKeyId> pendingEncryptions = new 
HashMap<>();
@@ -185,7 +258,7 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
 
   @Override
   public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) 
throws Exception {
-    long startTimeMs = System.currentTimeMillis();
+    long startTimeNs = getTimeSource().getTimeNs();
     String keyId = req.getParams().get(PARAM_KEY_ID);
     if (keyId == null || keyId.isEmpty()) {
       rsp.add(STATUS, STATUS_FAILURE);
@@ -194,15 +267,22 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
     } else if (keyId.equals(NO_KEY_ID)) {
       keyId = null;
     }
+    // Check the defined DirectoryFactory instance.
     EncryptionDirectoryFactory.getFactory(req.getCore());
+
+    if (req.getParams().getBool(DISTRIB, false)) {
+      distributeRequest(req, rsp, keyId, startTimeNs);
+      return;
+    }
+
     log.debug("Encrypt request for keyId={}", keyId);
     boolean success = false;
-    String encryptionState = STATE_PENDING;
+    State state = State.PENDING;
     try {
       SegmentInfos segmentInfos = readLatestCommit(req.getCore());
       if (segmentInfos.size() == 0) {
         commitEmptyIndexForEncryption(keyId, segmentInfos, req, rsp);
-        encryptionState = STATE_COMPLETE;
+        state = State.COMPLETE;
         success = true;
         return;
       }
@@ -221,7 +301,7 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
         }
       }
       if (encryptionComplete) {
-        encryptionState = STATE_COMPLETE;
+        state = State.COMPLETE;
         success = true;
         return;
       }
@@ -231,12 +311,11 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
         if (pendingKeyId != null) {
           if (Objects.equals(pendingKeyId.keyId, keyId)) {
             log.debug("Ongoing encryption for keyId={}", keyId);
-            encryptionState = STATE_PENDING;
             success = true;
           } else {
             log.debug("Core busy encrypting for keyId={} different than 
requested keyId={}",
                       pendingKeyId.keyId, keyId);
-            encryptionState = STATE_BUSY;
+            state = State.BUSY;
           }
           return;
         }
@@ -244,7 +323,7 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
       }
       try {
         commitEncryptionStart(keyId, segmentInfos, req, rsp);
-        encryptAsync(req, startTimeMs);
+        encryptAsync(req);
         success = true;
       } finally {
         if (!success) {
@@ -255,17 +334,121 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
       }
 
     } finally {
-      if (success) {
-        rsp.add(STATUS, STATUS_SUCCESS);
-      } else {
-        rsp.add(STATUS, STATUS_FAILURE);
+      String statusValue = success ? STATUS_SUCCESS : STATUS_FAILURE;
+      rsp.add(STATUS, statusValue);
+      rsp.addToLog(STATUS, statusValue);
+      rsp.add(ENCRYPTION_STATE, state.value);
+      rsp.addToLog(ENCRYPTION_STATE, state.value);
+      log.info("Responding encryption state={} success={} for keyId={} 
timeMs={}",
+          state.value, success, keyId, toMs(elapsedTimeNs(startTimeNs)));
+    }
+  }
+
+  private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, 
String keyId, long startTimeNs) {
+    boolean success = false;
+    State collectionState = null;
+    long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0);
+    TimeOut timeOut = timeAllowedMs <= 0 ? null : new TimeOut(timeAllowedMs, 
TimeUnit.MILLISECONDS, getTimeSource());
+    try {
+      String collectionName = 
req.getCore().getCoreDescriptor().getCollectionName();
+      if (collectionName == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Parameter " + DISTRIB + " can only be used in Solr Cloud mode.");
+      }
+      log.debug("Encrypt request distributed for keyId={}", keyId);
+      DocCollection docCollection = 
req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName);
+      if (docCollection == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Parameter " + DISTRIB + " present but collection '" + collectionName + "' not 
found.");
+      }
+      ModifiableSolrParams params = createDistributedRequestParams(req, rsp, 
keyId);
+      Collection<Slice> slices = docCollection.getActiveSlices();
+      Collection<Callable<State>> encryptRequests = new 
ArrayList<>(slices.size());
+      for (Slice slice : slices) {
+        Replica replica = slice.getLeader();
+        if (replica == null) {
+          log.error("No leader found for shard {}", slice.getName());
+          collectionState = State.ERROR;
+          continue;
+        }
+        encryptRequests.add(() -> sendEncryptionRequestWithRetry(replica, req, 
params, keyId));
+      }
+      try {
+        List<Future<State>> responses = timeOut == null ?
+            executor.invokeAll(encryptRequests)
+            : executor.invokeAll(encryptRequests, 
timeOut.timeLeft(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+        for (Future<State> response : responses) {
+          State state;
+          try {
+            state = response.get();
+          } catch (ExecutionException e) {
+            log.error("Error distributing encryption request for keyId={}", 
keyId, e);
+            collectionState = State.ERROR;
+            break;
+          } catch (CancellationException e) {
+            log.warn("Cancelled distributing encryption request for keyId={}", 
keyId, e);
+            if (collectionState == null || State.TIMEOUT.priority > 
collectionState.priority) {
+              collectionState = State.TIMEOUT;
+            }
+            break;
+          }
+          if (collectionState == null || state.priority > 
collectionState.priority) {
+            collectionState = state;
+          }
+        }
+      } catch (InterruptedException e) {
+        collectionState = State.INTERRUPTED;
+      }
+      success = collectionState == null || collectionState.isSuccess();
+    } finally {
+      String statusValue = success ? STATUS_SUCCESS : STATUS_FAILURE;
+      rsp.add(STATUS, statusValue);
+      rsp.addToLog(STATUS, statusValue);
+      if (collectionState != null) {
+        rsp.add(ENCRYPTION_STATE, collectionState.value);
+        rsp.addToLog(ENCRYPTION_STATE, collectionState.value);
+      }
+      if (log.isInfoEnabled()) {
+        log.info("Responding encryption distributed state={} success={} for 
keyId={} timeMs={}",
+            (collectionState == null ? null : collectionState.value), success, 
keyId, toMs(elapsedTimeNs(startTimeNs)));
       }
-      log.info("Responding encryption state={} success={} for keyId={}",
-               encryptionState, success, keyId);
-      rsp.add(ENCRYPTION_STATE, encryptionState);
     }
   }
 
+  protected ModifiableSolrParams 
createDistributedRequestParams(SolrQueryRequest req, SolrQueryResponse rsp, 
String keyId) {
+    return new ModifiableSolrParams().set(PARAM_KEY_ID, keyId == null ? 
NO_KEY_ID : keyId);
+  }
+
+  private State sendEncryptionRequestWithRetry(
+      Replica replica,
+      SolrQueryRequest req,
+      ModifiableSolrParams params,
+      String keyId) {
+    for (int numAttempts = 0; numAttempts < DISTRIBUTION_MAX_ATTEMPTS; 
numAttempts++) {
+      try {
+        SimpleSolrResponse response = sendEncryptionRequest(replica, req, 
params);
+        Object responseStatus = response.getResponse().get(STATUS);
+        Object responseState = response.getResponse().get(ENCRYPTION_STATE);
+        log.info("Encryption state {} status {} for replica {} keyId {} in {} 
ms", responseStatus, responseState, replica.getName(), keyId, 
response.getElapsedTime());
+        if (responseState != null) {
+          return State.fromValue(responseState.toString());
+        }
+      } catch (SolrServerException | IOException e) {
+        log.error("Error occurred while sending encryption request", e);
+      }
+    }
+    throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed 
encryption request to replica " + replica.getName() + " for keyId " + keyId);
+  }
+
+  private SimpleSolrResponse sendEncryptionRequest(
+      Replica replica,
+      SolrQueryRequest req,
+      ModifiableSolrParams params)
+      throws SolrServerException, IOException {
+    // Use the update-only http client, considering encryption is an update as 
we indeed create new Lucene segments.
+    Http2SolrClient solrClient = 
req.getCoreContainer().getUpdateShardHandler().getUpdateOnlyHttpClient();
+    GenericSolrRequest distributedRequest = new 
GenericSolrRequest(SolrRequest.METHOD.POST, req.getPath(), params);
+    return solrClient.requestWithBaseUrl(replica.getCoreUrl(), null, 
distributedRequest);
+  }
+
   private void commitEmptyIndexForEncryption(@Nullable String keyId,
                                              SegmentInfos segmentInfos,
                                              SolrQueryRequest req,
@@ -327,31 +510,33 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
     req.getCore().getUpdateHandler().commit(commitCmd);
   }
 
-  private void encryptAsync(SolrQueryRequest req, long startTimeMs) {
+  private void encryptAsync(SolrQueryRequest req) {
     log.debug("Submitting async encryption");
     executor.submit(() -> {
       try {
         EncryptionUpdateLog updateLog = getEncryptionUpdateLog(req.getCore());
         if (updateLog != null) {
           log.debug("Encrypting update log");
+          long startTimeNs = getTimeSource().getTimeNs();
           boolean logEncryptionComplete = updateLog.encryptLogs();
-          log.info("{} encrypted the update log in {}",
-                  logEncryptionComplete ? "Successfully" : "Partially", 
elapsedTime(startTimeMs));
+          log.info("{} encrypted the update log in {} ms",
+                  logEncryptionComplete ? "Successfully" : "Partially", 
toMs(elapsedTimeNs(startTimeNs)));
           // If the logs encryption is not complete, it means some logs are 
currently in use.
           // The encryption will be automatically be retried after the next 
commit which should
           // release the old transaction log and make it ready for encryption.
         }
 
         log.debug("Triggering index encryption");
+        long startTimeNs = getTimeSource().getTimeNs();
         CommitUpdateCommand commitCmd = new CommitUpdateCommand(req, true);
         // Trigger EncryptionMergePolicy.findForcedMerges() to re-encrypt
         // each segment which is not encrypted with the latest active key id.
         commitCmd.maxOptimizeSegments = Integer.MAX_VALUE;
         req.getCore().getUpdateHandler().commit(commitCmd);
-        log.info("Successfully triggered index encryption with commit in {}", 
elapsedTime(startTimeMs));
+        log.info("Successfully triggered index encryption with commit in {} 
ms", toMs(elapsedTimeNs(startTimeNs)));
 
       } catch (IOException e) {
-        log.error("Exception while encrypting the index after {}", 
elapsedTime(startTimeMs), e);
+        log.error("Exception while encrypting the index", e);
       } finally {
         synchronized (pendingEncryptionLock) {
           pendingEncryptions.remove(req.getCore().getName());
@@ -405,8 +590,17 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
     return Objects.equals(keyId, activeKeyId);
   }
 
-  private static String elapsedTime(long startTimeMs) {
-    return (System.currentTimeMillis() - startTimeMs) + " ms";
+  private long elapsedTimeNs(long startTimeNs) {
+    return getTimeSource().getTimeNs() - startTimeNs;
+  }
+
+  private static long toMs(long ns) {
+    return TimeUnit.NANOSECONDS.toMillis(ns);
+  }
+
+  // For testing.
+  protected TimeSource getTimeSource() {
+    return TimeSource.NANO_TIME;
   }
 
   /**
diff --git 
a/encryption/src/main/java/org/apache/solr/encryption/crypto/CharStreamEncrypter.java
 
b/encryption/src/main/java/org/apache/solr/encryption/crypto/CharStreamEncrypter.java
index 15a8b7b..3fd9ac6 100644
--- 
a/encryption/src/main/java/org/apache/solr/encryption/crypto/CharStreamEncrypter.java
+++ 
b/encryption/src/main/java/org/apache/solr/encryption/crypto/CharStreamEncrypter.java
@@ -39,6 +39,9 @@ import java.nio.charset.StandardCharsets;
  * buffers allocated. The encryption transformation is AES/CTR/NoPadding.
  * A secure random IV is generated for each encryption and appended as the 
first
  * appended chars.
+ * <p>
+ * This encryption tool is intended to encrypt write-once and then read-only 
strings,
+ * it should not be used to encrypt updatable content as CTR is not designed 
for that.
  */
 public class CharStreamEncrypter {
 
diff --git 
a/encryption/src/main/java/org/apache/solr/encryption/kms/KmsEncryptionRequestHandler.java
 
b/encryption/src/main/java/org/apache/solr/encryption/kms/KmsEncryptionRequestHandler.java
index 65f5989..595deb0 100644
--- 
a/encryption/src/main/java/org/apache/solr/encryption/kms/KmsEncryptionRequestHandler.java
+++ 
b/encryption/src/main/java/org/apache/solr/encryption/kms/KmsEncryptionRequestHandler.java
@@ -1,6 +1,7 @@
 package org.apache.solr.encryption.kms;
 
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.encryption.EncryptionRequestHandler;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
@@ -14,38 +15,45 @@ import java.util.Map;
  */
 public class KmsEncryptionRequestHandler extends EncryptionRequestHandler {
 
-    /**
-     * Tenant Id request parameter - required.
-     */
-    public static final String PARAM_TENANT_ID = "tenantId";
-    /**
-     * Data Key Blob request parameter - required.
-     */
-    public static final String PARAM_ENCRYPTION_KEY_BLOB = "encryptionKeyBlob";
+  /**
+   * Tenant Id request parameter - required.
+   */
+  public static final String PARAM_TENANT_ID = "tenantId";
+  /**
+   * Data Key Blob request parameter - required.
+   */
+  public static final String PARAM_ENCRYPTION_KEY_BLOB = "encryptionKeyBlob";
 
-    /**
-     *  Builds the KMS key cookie based on key id and key blob parameters of 
the request.
-     *  If a required parameter is missing, this method throws a {@link 
SolrException} with
-     *  {@link SolrException.ErrorCode#BAD_REQUEST} and sets the response 
status to failure.
-     */
-    @Override
-    protected Map<String, String> buildKeyCookie(String keyId,
-                                                 SolrQueryRequest req,
-                                                 SolrQueryResponse rsp) {
-        String tenantId = getRequiredRequestParam(req, PARAM_TENANT_ID, rsp);
-        String encryptionKeyBlob = getRequiredRequestParam(req, 
PARAM_ENCRYPTION_KEY_BLOB, rsp);
-        return Map.of(
-                PARAM_TENANT_ID, tenantId,
-                PARAM_ENCRYPTION_KEY_BLOB, encryptionKeyBlob
-        );
-    }
+  /**
+   * Builds the KMS key cookie based on key id and key blob parameters of the 
request.
+   * If a required parameter is missing, this method throws a {@link 
SolrException} with
+   * {@link SolrException.ErrorCode#BAD_REQUEST} and sets the response status 
to failure.
+   */
+  @Override
+  protected Map<String, String> buildKeyCookie(String keyId,
+                                               SolrQueryRequest req,
+                                               SolrQueryResponse rsp) {
+    String tenantId = getRequiredRequestParam(req, PARAM_TENANT_ID, rsp);
+    String encryptionKeyBlob = getRequiredRequestParam(req, 
PARAM_ENCRYPTION_KEY_BLOB, rsp);
+    return Map.of(
+        PARAM_TENANT_ID, tenantId,
+        PARAM_ENCRYPTION_KEY_BLOB, encryptionKeyBlob
+    );
+  }
 
-    private String getRequiredRequestParam(SolrQueryRequest req, String param, 
SolrQueryResponse rsp) {
-        String arg = req.getParams().get(param);
-        if (arg == null || arg.isEmpty()) {
-            rsp.add(STATUS, STATUS_FAILURE);
-            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, 
"Required parameter " + param + " must be present and not empty.");
-        }
-        return arg;
+  private String getRequiredRequestParam(SolrQueryRequest req, String param, 
SolrQueryResponse rsp) {
+    String arg = req.getParams().get(param);
+    if (arg == null || arg.isEmpty()) {
+      rsp.add(STATUS, STATUS_FAILURE);
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Required 
parameter " + param + " must be present and not empty.");
     }
+    return arg;
+  }
+
+  @Override
+  protected ModifiableSolrParams 
createDistributedRequestParams(SolrQueryRequest req, SolrQueryResponse rsp, 
String keyId) {
+    return super.createDistributedRequestParams(req, rsp, keyId)
+        .set(PARAM_TENANT_ID, getRequiredRequestParam(req, PARAM_TENANT_ID, 
rsp))
+        .set(PARAM_ENCRYPTION_KEY_BLOB, getRequiredRequestParam(req, 
PARAM_ENCRYPTION_KEY_BLOB, rsp));
+  }
 }
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionDirectoryTest.java
 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionDirectoryTest.java
index f868848..f278701 100644
--- 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionDirectoryTest.java
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionDirectoryTest.java
@@ -30,7 +30,10 @@ import org.junit.Test;
 import org.apache.solr.encryption.crypto.AesCtrEncrypterFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -44,16 +47,11 @@ import static 
org.apache.solr.encryption.TestingKeySupplier.KEY_SECRET_2;
 
 /**
  * Tests {@link EncryptionDirectory}.
- * <p>
- * This test class ignores the DirectoryFactory defined in solrconfig.xml to 
use
- * {@link EncryptionDirectoryFactory}.
  */
 public class EncryptionDirectoryTest extends SolrCloudTestCase {
 
   private static final String COLLECTION_PREFIX = 
EncryptionDirectoryTest.class.getSimpleName() + "-collection-";
 
-  private static MockEncryptionDirectory mockDir;
-
   private String collectionName;
   private CloudSolrClient solrClient;
   private EncryptionTestUtil testUtil;
@@ -80,14 +78,13 @@ public class EncryptionDirectoryTest extends 
SolrCloudTestCase {
     solrClient = cluster.getSolrClient();
     CollectionAdminRequest.createCollection(collectionName, 2, 
2).process(solrClient);
     cluster.waitForActiveCollection(collectionName, 2, 4);
-    testUtil = new EncryptionTestUtil(solrClient, collectionName);
+    testUtil = new EncryptionTestUtil(solrClient, collectionName)
+        .setShouldDistributeRequests(false);
   }
 
   @Override
   public void tearDown() throws Exception {
-    if (mockDir != null) {
-      mockDir.clearMockValues();
-    }
+    MockFactory.clearMockValues();
     
CollectionAdminRequest.deleteCollection(collectionName).process(solrClient);
     super.tearDown();
   }
@@ -106,7 +103,7 @@ public class EncryptionDirectoryTest extends 
SolrCloudTestCase {
    */
   private void indexAndEncryptOneSegment() throws Exception {
     // Start with no key ids defined in the latest commit metadata.
-    mockDir.clearMockValues();
+    MockFactory.clearMockValues();
     // Create 2 index segments without encryption.
     testUtil.indexDocsAndCommit("weather broadcast");
     testUtil.indexDocsAndCommit("sunny weather");
@@ -120,23 +117,23 @@ public class EncryptionDirectoryTest extends 
SolrCloudTestCase {
 
     // Set the encryption key id in the commit user data,
     // and run an optimized commit to rewrite the index, now encrypted.
-    mockDir.setKeysInCommitUserData(KEY_ID_1);
+    MockFactory.setKeysInCommitUserData(KEY_ID_1);
     optimizeCommit();
 
     // Verify that without key id, we cannot decrypt the index anymore.
-    mockDir.forceClearText = true;
+    MockFactory.forceClearText = true;
     testUtil.assertCannotReloadCores();
     // Verify that with a wrong key id, we cannot decrypt the index.
-    mockDir.forceClearText = false;
-    mockDir.forceKeySecret = KEY_SECRET_2;
+    MockFactory.forceClearText = false;
+    MockFactory.forceKeySecret = KEY_SECRET_2;
     testUtil.assertCannotReloadCores();
     // Verify that with the right key id, we can decrypt the index and search 
it.
-    mockDir.forceKeySecret = null;
-    mockDir.expectedKeySecret = KEY_SECRET_1;
+    MockFactory.forceKeySecret = null;
+    MockFactory.expectedKeySecrets = List.of(KEY_SECRET_1);
     testUtil.reloadCores();
     testUtil.assertQueryReturns("weather", 2);
     testUtil.assertQueryReturns("sunny", 1);
-    mockDir.clearMockValues();
+    MockFactory.clearMockValues();
   }
 
   /**
@@ -156,24 +153,24 @@ public class EncryptionDirectoryTest extends 
SolrCloudTestCase {
     indexAndEncryptOneSegment();
 
     // Create 1 new segment with the same encryption key id.
-    mockDir.setKeysInCommitUserData(KEY_ID_1);
+    MockFactory.setKeysInCommitUserData(KEY_ID_1);
     testUtil.indexDocsAndCommit("foggy weather");
     testUtil.indexDocsAndCommit("boo");
 
     // Verify that without key id, we cannot decrypt the index.
-    mockDir.forceClearText = true;
+    MockFactory.forceClearText = true;
     testUtil.assertCannotReloadCores();
     // Verify that with a wrong key id, we cannot decrypt the index.
-    mockDir.forceClearText = false;
-    mockDir.forceKeySecret = KEY_SECRET_2;
+    MockFactory.forceClearText = false;
+    MockFactory.forceKeySecret = KEY_SECRET_2;
     testUtil.assertCannotReloadCores();
     // Verify that with the right key id, we can decrypt the index and search 
it.
-    mockDir.forceKeySecret = null;
-    mockDir.expectedKeySecret = KEY_SECRET_1;
+    MockFactory.forceKeySecret = null;
+    MockFactory.expectedKeySecrets = List.of(KEY_SECRET_1);
     testUtil.reloadCores();
     testUtil.assertQueryReturns("weather", 3);
     testUtil.assertQueryReturns("sunny", 1);
-    mockDir.clearMockValues();
+    MockFactory.clearMockValues();
   }
 
   /**
@@ -186,19 +183,19 @@ public class EncryptionDirectoryTest extends 
SolrCloudTestCase {
 
     // Set the new encryption key id in the commit user data,
     // and run an optimized commit to rewrite the index, now encrypted with 
the new key.
-    mockDir.setKeysInCommitUserData(KEY_ID_1, KEY_ID_2);
+    MockFactory.setKeysInCommitUserData(KEY_ID_1, KEY_ID_2);
     optimizeCommit();
 
     // Verify that without key id, we cannot decrypt the index.
-    mockDir.forceClearText = true;
+    MockFactory.forceClearText = true;
     testUtil.assertCannotReloadCores();
     // Verify that with a wrong key id, we cannot decrypt the index.
-    mockDir.forceClearText = false;
-    mockDir.forceKeySecret = KEY_SECRET_1;
+    MockFactory.forceClearText = false;
+    MockFactory.forceKeySecret = KEY_SECRET_1;
     testUtil.assertCannotReloadCores();
     // Verify that with the right key id, we can decrypt the index and search 
it.
-    mockDir.forceKeySecret = null;
-    mockDir.expectedKeySecret = KEY_SECRET_2;
+    MockFactory.forceKeySecret = null;
+    MockFactory.expectedKeySecrets = List.of(KEY_SECRET_1, KEY_SECRET_2);
     testUtil.reloadCores();
     testUtil.assertQueryReturns("weather", 3);
     testUtil.assertQueryReturns("sunny", 1);
@@ -214,11 +211,11 @@ public class EncryptionDirectoryTest extends 
SolrCloudTestCase {
 
     // Remove the active key parameter from the commit user data,
     // and run an optimized commit to rewrite the index, now cleartext with no 
keys.
-    mockDir.setKeysInCommitUserData(KEY_ID_1, null);
+    MockFactory.setKeysInCommitUserData(KEY_ID_1, null);
     optimizeCommit();
 
     // Verify that without key id, we can reload the index because it is not 
encrypted.
-    mockDir.forceClearText = true;
+    MockFactory.forceClearText = true;
     testUtil.reloadCores();
     testUtil.assertQueryReturns("weather", 3);
     testUtil.assertQueryReturns("sunny", 1);
@@ -232,7 +229,7 @@ public class EncryptionDirectoryTest extends 
SolrCloudTestCase {
    * {@link EncryptionRequestHandler}, but this test is designed to work 
independently.
    */
   private void optimizeCommit() {
-    testUtil.forAllReplicas(replica -> {
+    testUtil.forAllReplicas(false, replica -> {
       UpdateRequest request = new UpdateRequest();
       request.setAction(UpdateRequest.ACTION.OPTIMIZE, true, true, 1);
       testUtil.requestCore(request, replica);
@@ -240,20 +237,41 @@ public class EncryptionDirectoryTest extends 
SolrCloudTestCase {
   }
 
   public static class MockFactory implements 
EncryptionDirectoryFactory.InnerFactory {
+
+    static final List<MockEncryptionDirectory> mockDirs = new ArrayList<>();
+
+    static boolean forceClearText;
+    static byte[] forceKeySecret;
+    static List<byte[]> expectedKeySecrets;
+
+    static void clearMockValues() {
+      forceClearText = false;
+      forceKeySecret = null;
+      expectedKeySecrets = null;
+      for (MockEncryptionDirectory mockDir : mockDirs) {
+        mockDir.clearMockValues();
+      }
+    }
+
+    static void setKeysInCommitUserData(String... keyIds) throws IOException {
+      for (MockEncryptionDirectory mockDir : mockDirs) {
+        mockDir.setKeysInCommitUserData(keyIds);
+      }
+    }
+
     @Override
     public EncryptionDirectory create(Directory delegate,
                                       AesCtrEncrypterFactory encrypterFactory,
                                       KeySupplier keySupplier) throws 
IOException {
-      return mockDir = new MockEncryptionDirectory(delegate, encrypterFactory, 
keySupplier);
+      MockEncryptionDirectory mockDir = new MockEncryptionDirectory(delegate, 
encrypterFactory, keySupplier);
+      mockDirs.add(mockDir);
+      return mockDir;
     }
   }
 
   private static class MockEncryptionDirectory extends EncryptionDirectory {
 
     final KeySupplier keySupplier;
-    boolean forceClearText;
-    byte[] forceKeySecret;
-    byte[] expectedKeySecret;
 
     MockEncryptionDirectory(Directory delegate, AesCtrEncrypterFactory 
encrypterFactory, KeySupplier keySupplier)
       throws IOException {
@@ -263,9 +281,6 @@ public class EncryptionDirectoryTest extends 
SolrCloudTestCase {
 
     void clearMockValues() {
       commitUserData = new CommitUserData(commitUserData.segmentFileName, 
Map.of());
-      forceClearText = false;
-      forceKeySecret = null;
-      expectedKeySecret = null;
     }
 
     /**
@@ -285,7 +300,7 @@ public class EncryptionDirectoryTest extends 
SolrCloudTestCase {
 
     @Override
     public IndexInput openInput(String fileName, IOContext context) throws 
IOException {
-      return forceClearText ? in.openInput(fileName, context) : 
super.openInput(fileName, context);
+      return MockFactory.forceClearText ? in.openInput(fileName, context) : 
super.openInput(fileName, context);
     }
 
     @Override
@@ -298,12 +313,19 @@ public class EncryptionDirectoryTest extends 
SolrCloudTestCase {
 
     @Override
     protected byte[] getKeySecret(String keyRef) throws IOException {
-      if (forceKeySecret != null) {
-        return forceKeySecret;
+      if (MockFactory.forceKeySecret != null) {
+        return MockFactory.forceKeySecret;
       }
       byte[] keySecret = super.getKeySecret(keyRef);
-      if (expectedKeySecret != null) {
-        assertArrayEquals(expectedKeySecret, keySecret);
+      if (MockFactory.expectedKeySecrets != null) {
+        boolean keySecretMatches = false;
+        for (byte[] expectedKeySecret : MockFactory.expectedKeySecrets) {
+          if (Arrays.equals(expectedKeySecret, keySecret)) {
+            keySecretMatches = true;
+            break;
+          }
+        }
+        assertTrue(keySecretMatches);
       }
       return keySecret;
     }
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionHeavyLoadTest.java
 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionHeavyLoadTest.java
index e5971d5..86be5b2 100644
--- 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionHeavyLoadTest.java
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionHeavyLoadTest.java
@@ -57,7 +57,7 @@ import static 
org.apache.solr.encryption.TestingKeySupplier.KEY_ID_3;
 public class EncryptionHeavyLoadTest extends SolrCloudTestCase {
 
   // Change the test duration manually to run longer, e.g. 20 minutes.
-  private static final long TEST_DURATION_MS = TimeUnit.SECONDS.toMillis(10);
+  private static final long TEST_DURATION_NS = TimeUnit.SECONDS.toNanos(10);
   private static final int RANDOM_DELAY_BETWEEN_INDEXING_BATCHES_MS = 50;
   private static final int RANDOM_NUM_DOCS_PER_BATCH = 200;
   private static final float PROBABILITY_OF_COMMIT_PER_BATCH = 0.33f;
@@ -81,9 +81,9 @@ public class EncryptionHeavyLoadTest extends 
SolrCloudTestCase {
   private int nextKeyIndex;
   private String keyId;
   private volatile Exception exception;
-  private long startTimeMs;
-  private long endTimeMs;
-  private long lastDisplayTimeMs;
+  private long startTimeNs;
+  private long endTimeNs;
+  private long lastDisplayTimeNs;
 
   @BeforeClass
   public static void beforeClass() throws Exception {
@@ -123,8 +123,8 @@ public class EncryptionHeavyLoadTest extends 
SolrCloudTestCase {
           System.err.println("Interrupted while closing " + thread.getName());
         }
       }
-      startTimeMs = lastDisplayTimeMs = System.currentTimeMillis();
-      endTimeMs = startTimeMs + TimeUnit.SECONDS.toMillis(20);
+      startTimeNs = lastDisplayTimeNs = System.nanoTime();
+      endTimeNs = startTimeNs + TimeUnit.SECONDS.toNanos(20);
       print("waiting for the final encryption completion");
       assertTrue("Timeout waiting for the final encryption completion", 
encrypt(keyId, true));
       print("final encryption complete");
@@ -136,8 +136,8 @@ public class EncryptionHeavyLoadTest extends 
SolrCloudTestCase {
   @Test
   public void testReencryptionUnderHeavyConcurrentLoad() throws Exception {
     print("Starting test");
-    startTimeMs = lastDisplayTimeMs = System.currentTimeMillis();
-    endTimeMs = startTimeMs + TEST_DURATION_MS;
+    startTimeNs = lastDisplayTimeNs = System.nanoTime();
+    endTimeNs = startTimeNs + TEST_DURATION_NS;
     Random random = random();
     if (random.nextBoolean()) {
       print("preparing empty index for encryption");
@@ -149,8 +149,9 @@ public class EncryptionHeavyLoadTest extends 
SolrCloudTestCase {
       Thread.sleep(random.nextInt(RANDOM_DELAY_BETWEEN_REENCRYPTION_MS));
       encrypt(nextKeyId(), waitForCompletion(random));
     }
-    if (System.currentTimeMillis() - lastDisplayTimeMs >= 1000) {
-      print("elapsed time = " + ((System.currentTimeMillis() - startTimeMs) / 
1000) + " s");
+    long timeNs = System.nanoTime();
+    if (timeNs - lastDisplayTimeNs >= TimeUnit.SECONDS.toNanos(1)) {
+      print("elapsed time = " + TimeUnit.NANOSECONDS.toSeconds(timeNs - 
startTimeNs) + " s");
     }
     print("Stopping test");
     if (exception != null) {
@@ -170,12 +171,12 @@ public class EncryptionHeavyLoadTest extends 
SolrCloudTestCase {
   }
 
   private boolean isTimeElapsed() {
-    long timeMs = System.currentTimeMillis();
-    if (timeMs - lastDisplayTimeMs >= 10000) {
-      print("elapsed time = " + ((timeMs - startTimeMs) / 1000) + " s");
-      lastDisplayTimeMs = timeMs;
+    long timeNs = System.nanoTime();
+    if (timeNs - lastDisplayTimeNs >= TimeUnit.SECONDS.toNanos(10)) {
+      print("elapsed time = " + TimeUnit.NANOSECONDS.toSeconds(timeNs - 
startTimeNs) + " s");
+      lastDisplayTimeNs = timeNs;
     }
-    return timeMs >= endTimeMs;
+    return timeNs >= endTimeNs;
   }
 
   private String nextKeyId() {
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionRequestHandlerTest.java
 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionRequestHandlerTest.java
index 30cfe2b..729e35d 100644
--- 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionRequestHandlerTest.java
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionRequestHandlerTest.java
@@ -34,6 +34,7 @@ import java.util.UUID;
 
 import static 
org.apache.solr.encryption.EncryptionDirectoryFactory.PROPERTY_INNER_ENCRYPTION_DIRECTORY_FACTORY;
 import static org.apache.solr.encryption.EncryptionRequestHandler.NO_KEY_ID;
+import static 
org.apache.solr.encryption.EncryptionRequestHandler.STATUS_SUCCESS;
 import static org.apache.solr.encryption.EncryptionUtil.getKeyIdFromCommit;
 import static org.apache.solr.encryption.TestingKeySupplier.KEY_ID_1;
 import static org.apache.solr.encryption.TestingKeySupplier.KEY_ID_2;
@@ -46,6 +47,8 @@ public class EncryptionRequestHandlerTest extends 
SolrCloudTestCase {
 
   private static final String COLLECTION_PREFIX = 
EncryptionRequestHandlerTest.class.getSimpleName() + "-collection-";
 
+  protected static String configDir = "collection1";
+
   private static volatile boolean forceClearText;
   private static volatile String soleKeyIdAllowed;
 
@@ -58,7 +61,7 @@ public class EncryptionRequestHandlerTest extends 
SolrCloudTestCase {
     System.setProperty(PROPERTY_INNER_ENCRYPTION_DIRECTORY_FACTORY, 
MockFactory.class.getName());
     EncryptionTestUtil.setInstallDirProperty();
     cluster = new MiniSolrCloudCluster.Builder(2, createTempDir())
-      .addConfig("config", EncryptionTestUtil.getRandomConfigPath())
+      .addConfig("config", EncryptionTestUtil.getConfigPath(configDir))
       .configure();
   }
 
@@ -199,9 +202,63 @@ public class EncryptionRequestHandlerTest extends 
SolrCloudTestCase {
     testUtil.assertQueryReturns("weather", 4);
   }
 
+  @Test
+  public void testDistributionTimeout() throws Exception {
+    // Ensure the next distributed requests will time out.
+    testUtil
+        .setShouldDistributeRequests(true)
+        .setDistributionTimeoutMs(1); // any value > 0 will trigger the mock 
timeout.
+    TestingEncryptionRequestHandler.isDistributionTimeout = true;
+
+    // Send an encrypt request with a key id on an empty index.
+    EncryptionTestUtil.EncryptionStatus encryptionStatus = 
testUtil.encrypt(KEY_ID_1);
+
+    // Verify that the distribution timeout is handled with the appropriate 
response status.
+    assertFalse(encryptionStatus.isSuccess());
+    assertFalse(encryptionStatus.isComplete());
+    assertEquals(EncryptionRequestHandler.State.TIMEOUT, 
encryptionStatus.getCollectionState());
+  }
+
+  @Test
+  public void testDistributionState() throws Exception {
+    // Ensure the next distributed requests will return PENDING state.
+    testUtil.setShouldDistributeRequests(true);
+    TestingEncryptionRequestHandler.mockedDistributedResponseStatus = 
STATUS_SUCCESS;
+    TestingEncryptionRequestHandler.mockedDistributedResponseState = 
EncryptionRequestHandler.State.PENDING;
+
+    // Send an encrypt request with a key id on an empty index.
+    EncryptionTestUtil.EncryptionStatus encryptionStatus = 
testUtil.encrypt(KEY_ID_1);
+
+    // Verify that the distribution is successful with the PENDING state.
+    assertTrue(encryptionStatus.isSuccess());
+    assertEquals(EncryptionRequestHandler.State.PENDING, 
encryptionStatus.getCollectionState());
+
+    // Ensure the next distributed requests will return BUSY state.
+    TestingEncryptionRequestHandler.mockedDistributedResponseState = 
EncryptionRequestHandler.State.BUSY;
+
+    // Send an encrypt request with a key id on an empty index.
+    encryptionStatus = testUtil.encrypt(KEY_ID_1);
+
+    // Verify that the distribution is successful with the BUSY state.
+    assertTrue(encryptionStatus.isSuccess());
+    assertEquals(EncryptionRequestHandler.State.BUSY, 
encryptionStatus.getCollectionState());
+
+    // Ensure the next distributed requests return regular state.
+    TestingEncryptionRequestHandler.mockedDistributedResponseStatus = null;
+    TestingEncryptionRequestHandler.mockedDistributedResponseState = null;
+
+    // Send an encrypt request with a key id on an empty index.
+    encryptionStatus = testUtil.encrypt(KEY_ID_1);
+
+    // Verify that the distribution is successful with the COMPLETE state.
+    assertTrue(encryptionStatus.isSuccess());
+    assertEquals(EncryptionRequestHandler.State.COMPLETE, 
encryptionStatus.getCollectionState());
+  }
+
   private static void clearMockValues() {
     forceClearText = false;
     soleKeyIdAllowed = null;
+    TestingEncryptionRequestHandler.clearMockedValues();
   }
 
   public static class MockFactory implements 
EncryptionDirectoryFactory.InnerFactory {
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionTestUtil.java 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionTestUtil.java
index 3321bef..286f48b 100644
--- 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionTestUtil.java
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionTestUtil.java
@@ -31,6 +31,7 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.RetryUtil;
 import org.apache.solr.encryption.kms.TestingKmsClient;
@@ -42,9 +43,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import static org.apache.lucene.tests.util.LuceneTestCase.random;
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
+import static org.apache.solr.common.params.CommonParams.TIME_ALLOWED;
 import static 
org.apache.solr.encryption.EncryptionRequestHandler.ENCRYPTION_STATE;
 import static org.apache.solr.encryption.EncryptionRequestHandler.PARAM_KEY_ID;
-import static 
org.apache.solr.encryption.EncryptionRequestHandler.STATE_COMPLETE;
 import static org.apache.solr.encryption.EncryptionRequestHandler.STATUS;
 import static 
org.apache.solr.encryption.EncryptionRequestHandler.STATUS_SUCCESS;
 import static 
org.apache.solr.encryption.kms.KmsEncryptionRequestHandler.PARAM_ENCRYPTION_KEY_BLOB;
@@ -70,12 +72,31 @@ public class EncryptionTestUtil {
   private final CloudSolrClient cloudSolrClient;
   private final String collectionName;
   private int docId;
+  private Boolean shouldDistributeRequests;
+  private long distributionTimeoutMs;
 
   public EncryptionTestUtil(CloudSolrClient cloudSolrClient, String 
collectionName) {
     this.cloudSolrClient = cloudSolrClient;
     this.collectionName = collectionName;
   }
 
+  public boolean shouldDistributeEncryptRequest() {
+    if (shouldDistributeRequests == null) {
+      setShouldDistributeRequests(random().nextBoolean());
+    }
+    return shouldDistributeRequests;
+  }
+
+  public EncryptionTestUtil setShouldDistributeRequests(Boolean 
shouldDistributeRequests) {
+    this.shouldDistributeRequests = shouldDistributeRequests;
+    return this;
+  }
+
+  public EncryptionTestUtil setDistributionTimeoutMs(long 
distributionTimeoutMs) {
+    this.distributionTimeoutMs = distributionTimeoutMs;
+    return this;
+  }
+
   /**
    * Sets the "solr.install.dir" system property.
    */
@@ -130,16 +151,36 @@ public class EncryptionTestUtil {
     params.set(PARAM_KEY_ID, keyId);
     params.set(PARAM_TENANT_ID, TENANT_ID);
     params.set(PARAM_ENCRYPTION_KEY_BLOB, generateKeyBlob(keyId));
+    if (shouldDistributeEncryptRequest()) {
+      return encryptDistrib(params);
+    }
     GenericSolrRequest encryptRequest = new 
GenericSolrRequest(SolrRequest.METHOD.GET, "/admin/encrypt", params);
     EncryptionStatus encryptionStatus = new EncryptionStatus();
-    forAllReplicas(replica -> {
-      NamedList<Object> response = requestCore(encryptRequest, replica);
+    forAllReplicas(false, replica -> {
+      NamedList<?> response = requestCore(encryptRequest, replica);
+      EncryptionRequestHandler.State state = 
EncryptionRequestHandler.State.fromValue(response.get(ENCRYPTION_STATE).toString());
       encryptionStatus.success &= response.get(STATUS).equals(STATUS_SUCCESS);
-      encryptionStatus.complete &= 
response.get(ENCRYPTION_STATE).equals(STATE_COMPLETE);
+      encryptionStatus.complete &= state == 
EncryptionRequestHandler.State.COMPLETE;
     });
     return encryptionStatus;
   }
 
+  private EncryptionStatus encryptDistrib(SolrParams params) throws 
SolrServerException, IOException {
+    ModifiableSolrParams modifiableParams = new 
ModifiableSolrParams().set(DISTRIB, true);
+    if (distributionTimeoutMs != 0 || random().nextBoolean()) {
+      modifiableParams.set(TIME_ALLOWED, 
String.valueOf(distributionTimeoutMs));
+    }
+    params = SolrParams.wrapDefaults(params, modifiableParams);
+    GenericSolrRequest encryptRequest = new 
GenericSolrRequest(SolrRequest.METHOD.GET, "/admin/encrypt", params);
+    NamedList<Object> response = cloudSolrClient.request(encryptRequest, 
collectionName);
+    EncryptionRequestHandler.State state = 
EncryptionRequestHandler.State.fromValue(response.get(ENCRYPTION_STATE).toString());
+    EncryptionStatus encryptionStatus = new EncryptionStatus();
+    encryptionStatus.success = response.get(STATUS).equals(STATUS_SUCCESS);
+    encryptionStatus.complete = state == 
EncryptionRequestHandler.State.COMPLETE;
+    encryptionStatus.collectionState = state;
+    return encryptionStatus;
+  }
+
   private String generateKeyBlob(String keyId) throws Exception {
     return TestingKmsClient.singleton == null ?
             generateMockKeyBlob(keyId)
@@ -187,10 +228,9 @@ public class EncryptionTestUtil {
    */
   public void reloadCores() throws Exception {
     try {
-      forAllReplicas(replica -> {
+      forAllReplicas(shouldDistributeEncryptRequest(), replica -> {
         try {
           CoreAdminRequest req = new CoreAdminRequest();
-          req.setBasePath(replica.getBaseUrl());
           req.setCoreName(replica.getCoreName());
           req.setAction(CoreAdminParams.CoreAdminAction.RELOAD);
           try (Http2SolrClient httpSolrClient = new 
Http2SolrClient.Builder(replica.getBaseUrl()).build()) {
@@ -220,18 +260,21 @@ public class EncryptionTestUtil {
   }
 
   /** Processes the given {@code action} for all replicas of the collection 
defined in the constructor. */
-  public void forAllReplicas(Consumer<Replica> action) {
+  public void forAllReplicas(boolean onlyLeaders, Consumer<Replica> action) {
     for (Slice slice : 
cloudSolrClient.getClusterState().getCollection(collectionName).getSlices()) {
-      for (Replica replica : slice.getReplicas()) {
-        action.accept(replica);
+      if (onlyLeaders) {
+        action.accept(slice.getLeader());
+      } else {
+        for (Replica replica : slice.getReplicas()) {
+          action.accept(replica);
+        }
       }
     }
   }
 
   /** Sends the given {@link SolrRequest} to a specific replica. */
   public NamedList<Object> requestCore(SolrRequest<?> request, Replica 
replica) {
-    request.setBasePath(replica.getCoreUrl());
-    try (Http2SolrClient httpSolrClient = new 
Http2SolrClient.Builder(replica.getBaseUrl()).build()) {
+    try (Http2SolrClient httpSolrClient = new 
Http2SolrClient.Builder(replica.getCoreUrl()).build()) {
       return httpSolrClient.request(request);
     } catch (SolrServerException e) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -251,6 +294,7 @@ public class EncryptionTestUtil {
 
     private boolean success;
     private boolean complete;
+    private EncryptionRequestHandler.State collectionState;
 
     private EncryptionStatus() {
       this.success = true;
@@ -264,5 +308,9 @@ public class EncryptionTestUtil {
     public boolean isComplete() {
       return complete;
     }
+
+    public EncryptionRequestHandler.State getCollectionState() {
+      return collectionState;
+    }
   }
 }
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateLogTest.java
 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateLogTest.java
index 049efa1..21f5293 100644
--- 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateLogTest.java
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateLogTest.java
@@ -112,7 +112,8 @@ public class EncryptionUpdateLogTest extends 
SolrCloudTestCase {
 
     resetCounters();
     solrClient.add(collectionName, sdoc("id", "1", "text", "test"));
-    assertEquals(NUM_REPLICAS, encryptedLogWriteCount.get());
+    // If the encrypt request is distributed, we expect only the leader to 
encrypt its update log.
+    assertEquals(testUtil.shouldDistributeEncryptRequest() ? 1 : NUM_REPLICAS, 
encryptedLogWriteCount.get());
     assertEquals(0, encryptedLogReadCount.get());
     assertEquals(0, reencryptionCallCount.get());
 
@@ -134,7 +135,8 @@ public class EncryptionUpdateLogTest extends 
SolrCloudTestCase {
 
     resetCounters();
     solrClient.add(collectionName, sdoc("id", "1", "text", "test"));
-    assertEquals(NUM_REPLICAS, encryptedLogWriteCount.get());
+    // If the encrypt request is distributed, we expect only the leader to 
encrypt its update log.
+    assertEquals(testUtil.shouldDistributeEncryptRequest() ? 1 : NUM_REPLICAS, 
encryptedLogWriteCount.get());
     assertEquals(0, encryptedLogReadCount.get());
     assertEquals(0, reencryptionCallCount.get());
 
@@ -145,12 +147,13 @@ public class EncryptionUpdateLogTest extends 
SolrCloudTestCase {
     assertEquals(0, encryptedLogWriteCount.get());
     assertEquals(0, encryptedLogReadCount.get());
     // There are two transaction logs, the old one before the commit in 
checkEncryptionFromNoKeysToOneKey
-    // and the current one. So we expect each transaction log to be encrypted 
on all 2 replicas.
-    assertEquals(2 * NUM_REPLICAS, reencryptionCallCount.get());
+    // and the current one. So we expect each transaction log to be encrypted 
on either all replicas,
+    // or only the leader.
+    assertEquals(2 * (testUtil.shouldDistributeEncryptRequest() ? 1 : 
NUM_REPLICAS), reencryptionCallCount.get());
 
     resetCounters();
     solrClient.add(collectionName, sdoc("id", "2", "text", "test"));
-    assertEquals(toKeyId.equals(NO_KEY_ID) ? 0 : NUM_REPLICAS, 
encryptedLogWriteCount.get());
+    assertEquals(toKeyId.equals(NO_KEY_ID) ? 0 : 
(testUtil.shouldDistributeEncryptRequest() ? 1 : NUM_REPLICAS), 
encryptedLogWriteCount.get());
     assertEquals(0, encryptedLogReadCount.get());
     assertEquals(0, reencryptionCallCount.get());
 
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/TestingEncryptionRequestHandler.java
 
b/encryption/src/test/java/org/apache/solr/encryption/TestingEncryptionRequestHandler.java
index 0e182f3..29fc528 100644
--- 
a/encryption/src/test/java/org/apache/solr/encryption/TestingEncryptionRequestHandler.java
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/TestingEncryptionRequestHandler.java
@@ -16,11 +16,15 @@
  */
 package org.apache.solr.encryption;
 
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
 
 /**
  * {@link EncryptionRequestHandler} for tests. Builds a mock key cookie.
@@ -29,6 +33,64 @@ public class TestingEncryptionRequestHandler extends 
EncryptionRequestHandler {
 
   public static final Map<String, String> MOCK_COOKIE_PARAMS = 
Map.of("testParam", "testValue");
 
+  private static final TimeSource TIMEOUT_TIME_SOURCE = new TimeSource() {
+    @Override
+    public long getTimeNs() {
+      return Long.MAX_VALUE;
+    }
+
+    @Override
+    public long getEpochTimeNs() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long[] getTimeAndEpochNs() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void sleep(long ms) throws InterruptedException {
+      NANO_TIME.sleep(ms);
+    }
+
+    @Override
+    public long convertDelay(TimeUnit fromUnit, long delay, TimeUnit toUnit) {
+      return NANO_TIME.convertDelay(fromUnit, delay, toUnit);
+    }
+  };
+
+  public static volatile String mockedDistributedResponseStatus;
+  public static volatile State mockedDistributedResponseState;
+  public static volatile boolean isDistributionTimeout;
+
+  public static void clearMockedValues() {
+    mockedDistributedResponseStatus = null;
+    mockedDistributedResponseState = null;
+    isDistributionTimeout = false;
+  }
+
+  @Override
+  public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) 
throws Exception {
+    if (!req.getParams().getBool(DISTRIB, false)) {
+      if (mockedDistributedResponseStatus != null || 
mockedDistributedResponseState != null) {
+        if (mockedDistributedResponseStatus != null) {
+          rsp.add(STATUS, mockedDistributedResponseStatus);
+        }
+        if (mockedDistributedResponseState != null) {
+          rsp.add(ENCRYPTION_STATE, mockedDistributedResponseState.value);
+        }
+        return;
+      }
+    }
+    super.handleRequestBody(req, rsp);
+  }
+
+  @Override
+  protected TimeSource getTimeSource() {
+    return isDistributionTimeout ? TIMEOUT_TIME_SOURCE : super.getTimeSource();
+  }
+
   @Override
   protected Map<String, String> buildKeyCookie(String keyId,
                                                SolrQueryRequest req,
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/kms/KmsEncryptionRequestHandlerTest.java
 
b/encryption/src/test/java/org/apache/solr/encryption/kms/KmsEncryptionRequestHandlerTest.java
index 24780a6..9204060 100644
--- 
a/encryption/src/test/java/org/apache/solr/encryption/kms/KmsEncryptionRequestHandlerTest.java
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/kms/KmsEncryptionRequestHandlerTest.java
@@ -2,13 +2,10 @@ package org.apache.solr.encryption.kms;
 
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.request.GenericSolrRequest;
-import org.apache.solr.cloud.MiniSolrCloudCluster;
-import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.encryption.EncryptionTestUtil;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.solr.encryption.EncryptionRequestHandlerTest;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.solr.encryption.EncryptionRequestHandler.PARAM_KEY_ID;
@@ -18,19 +15,10 @@ import static 
org.apache.solr.encryption.kms.KmsEncryptionRequestHandler.PARAM_T
 /**
  * Tests {@link KmsEncryptionRequestHandler}.
  */
-public class KmsEncryptionRequestHandlerTest extends SolrCloudTestCase {
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    EncryptionTestUtil.setInstallDirProperty();
-    cluster = new MiniSolrCloudCluster.Builder(2, createTempDir())
-            .addConfig("config", 
EncryptionTestUtil.getConfigPath("collection1"))
-            .configure();
-  }
+public class KmsEncryptionRequestHandlerTest extends 
EncryptionRequestHandlerTest {
 
-  @AfterClass
-  public static void afterClass() throws Exception {
-    cluster.shutdown();
+  static {
+    configDir = "kms";
   }
 
   @Test(expected = SolrException.class)
@@ -48,4 +36,16 @@ public class KmsEncryptionRequestHandlerTest extends 
SolrCloudTestCase {
     params.set(PARAM_ENCRYPTION_KEY_BLOB, "keyBlob");
     cluster.getSolrClient().request(new 
GenericSolrRequest(SolrRequest.METHOD.GET, "/admin/encrypt", params));
   }
+
+  @Ignore
+  @Override
+  // Do not test timeout because the "kms" config does not define the 
TestingEncryptionRequestHandler
+  // required for the test.
+  public void testDistributionTimeout() {}
+
+  @Ignore
+  @Test
+  // Do not test state because the "kms" config does not define the 
TestingEncryptionRequestHandler
+  // required for the test.
+  public void testDistributionState() {}
 }
\ No newline at end of file

Reply via email to