This is an automated email from the ASF dual-hosted git repository.

broustant pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new 5266383  Refactor EncryptionRequestHandler to split large methods. 
(#120)
5266383 is described below

commit 526638363c43f323b2c89f76a58fff445c5e8e30
Author: Bruno Roustant <[email protected]>
AuthorDate: Tue Aug 5 16:07:41 2025 +0200

    Refactor EncryptionRequestHandler to split large methods. (#120)
---
 .../solr/encryption/EncryptionRequestHandler.java  | 516 ++++++++++++---------
 1 file changed, 309 insertions(+), 207 deletions(-)

diff --git 
a/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java
 
b/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java
index a124e46..d1ada0a 100644
--- 
a/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java
+++ 
b/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java
@@ -258,196 +258,71 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
 
   @Override
   public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) 
throws Exception {
-    long startTimeNs = getTimeSource().getTimeNs();
+    String keyId = getRequestKeyId(req, rsp);
+    checkEncryptionDirectoryFactory(req);
+    if (req.getParams().getBool(DISTRIB, false)) {
+      distributeRequest(req, rsp, keyId);
+    } else {
+      localRequest(req, rsp, keyId);
+    }
+  }
+
+  private static String getRequestKeyId(SolrQueryRequest req, 
SolrQueryResponse rsp) throws IOException {
     String keyId = req.getParams().get(PARAM_KEY_ID);
     if (keyId == null || keyId.isEmpty()) {
       rsp.add(STATUS, STATUS_FAILURE);
       throw new IOException("Parameter " + PARAM_KEY_ID + " must be present 
and not empty."
-                              + " Use [" + PARAM_KEY_ID + "=\"" + NO_KEY_ID + 
"\"] for explicit decryption.");
+          + " Use [" + PARAM_KEY_ID + "=\"" + NO_KEY_ID + "\"] for explicit 
decryption.");
     } else if (keyId.equals(NO_KEY_ID)) {
       keyId = null;
     }
+    return keyId;
+  }
+
+  private static void checkEncryptionDirectoryFactory(SolrQueryRequest req) {
     // Check the defined DirectoryFactory instance.
     EncryptionDirectoryFactory.getFactory(req.getCore());
+  }
 
-    if (req.getParams().getBool(DISTRIB, false)) {
-      distributeRequest(req, rsp, keyId, startTimeNs);
-      return;
-    }
-
+  private void localRequest(
+      SolrQueryRequest req,
+      SolrQueryResponse rsp,
+      String keyId)
+      throws IOException {
+    long startTimeNs = getTimeSource().getTimeNs();
     log.debug("Encrypt request for keyId={}", keyId);
-    boolean success = false;
-    State state = State.PENDING;
+    RequestStatus requestStatus = RequestStatus.ERROR;
     try {
-      SegmentInfos segmentInfos = readLatestCommit(req.getCore());
-      if (segmentInfos.size() == 0) {
-        commitEmptyIndexForEncryption(keyId, segmentInfos, req, rsp);
-        state = State.COMPLETE;
-        success = true;
-        return;
-      }
-
-      boolean encryptionComplete = false;
-      if (isCommitActiveKeyId(keyId, segmentInfos)) {
-        log.debug("Provided keyId={} is the current active key id", keyId);
-        if 
(Boolean.parseBoolean(segmentInfos.getUserData().get(COMMIT_ENCRYPTION_PENDING)))
 {
-          encryptionComplete = areAllSegmentsEncryptedWithKeyId(keyId, 
req.getCore(), segmentInfos)
-            && areAllLogsEncryptedWithKeyId(keyId, req.getCore(), 
segmentInfos);
-          if (encryptionComplete) {
-            commitEncryptionComplete(keyId, segmentInfos, req);
-          }
-        } else {
-          encryptionComplete = true;
-        }
-      }
-      if (encryptionComplete) {
-        state = State.COMPLETE;
-        success = true;
-        return;
-      }
-
-      synchronized (pendingEncryptionLock) {
-        PendingKeyId pendingKeyId = 
pendingEncryptions.get(req.getCore().getName());
-        if (pendingKeyId != null) {
-          if (Objects.equals(pendingKeyId.keyId, keyId)) {
-            log.debug("Ongoing encryption for keyId={}", keyId);
-            success = true;
-          } else {
-            log.debug("Core busy encrypting for keyId={} different than 
requested keyId={}",
-                      pendingKeyId.keyId, keyId);
-            state = State.BUSY;
-          }
-          return;
-        }
-        pendingEncryptions.put(req.getCore().getName(), new 
PendingKeyId(keyId));
-      }
-      try {
-        commitEncryptionStart(keyId, segmentInfos, req, rsp);
-        encryptAsync(req);
-        success = true;
-      } finally {
-        if (!success) {
-          synchronized (pendingEncryptionLock) {
-            pendingEncryptions.remove(req.getCore().getName());
-          }
-        }
-      }
-
+      requestStatus = handleRequestLocallyInner(req, rsp, keyId);
     } finally {
-      String statusValue = success ? STATUS_SUCCESS : STATUS_FAILURE;
+      String statusValue = requestStatus.success ? STATUS_SUCCESS : 
STATUS_FAILURE;
       rsp.add(STATUS, statusValue);
       rsp.addToLog(STATUS, statusValue);
-      rsp.add(ENCRYPTION_STATE, state.value);
-      rsp.addToLog(ENCRYPTION_STATE, state.value);
+      rsp.add(ENCRYPTION_STATE, requestStatus.state.value);
+      rsp.addToLog(ENCRYPTION_STATE, requestStatus.state.value);
       log.info("Responding encryption state={} success={} for keyId={} 
timeMs={}",
-          state.value, success, keyId, toMs(elapsedTimeNs(startTimeNs)));
+          requestStatus.state.value, requestStatus.success, keyId, 
toMs(elapsedTimeNs(startTimeNs)));
     }
   }
 
-  private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, 
String keyId, long startTimeNs) {
-    boolean success = false;
-    State collectionState = null;
-    long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0);
-    TimeOut timeOut = timeAllowedMs <= 0 ? null : new TimeOut(timeAllowedMs, 
TimeUnit.MILLISECONDS, getTimeSource());
-    try {
-      String collectionName = 
req.getCore().getCoreDescriptor().getCollectionName();
-      if (collectionName == null) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Parameter " + DISTRIB + " can only be used in Solr Cloud mode.");
-      }
-      log.debug("Encrypt request distributed for keyId={}", keyId);
-      DocCollection docCollection = 
req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName);
-      if (docCollection == null) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Parameter " + DISTRIB + " present but collection '" + collectionName + "' not 
found.");
-      }
-      ModifiableSolrParams params = createDistributedRequestParams(req, rsp, 
keyId);
-      Collection<Slice> slices = docCollection.getActiveSlices();
-      Collection<Callable<State>> encryptRequests = new 
ArrayList<>(slices.size());
-      for (Slice slice : slices) {
-        Replica leader = slice.getLeader();
-        if (leader == null) {
-          log.error("No leader found for shard {}", slice.getName());
-          collectionState = State.ERROR;
-          continue;
-        }
-        encryptRequests.add(() -> sendEncryptionRequestWithRetry(leader, req, 
params, keyId));
-      }
-      try {
-        List<Future<State>> responses = timeOut == null ?
-            executor.invokeAll(encryptRequests)
-            : executor.invokeAll(encryptRequests, 
timeOut.timeLeft(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
-        for (Future<State> response : responses) {
-          State state;
-          try {
-            state = response.get();
-          } catch (ExecutionException e) {
-            log.error("Error distributing encryption request for keyId={}", 
keyId, e);
-            collectionState = State.ERROR;
-            break;
-          } catch (CancellationException e) {
-            log.warn("Cancelled distributing encryption request for keyId={}", 
keyId, e);
-            if (collectionState == null || State.TIMEOUT.priority > 
collectionState.priority) {
-              collectionState = State.TIMEOUT;
-            }
-            break;
-          }
-          if (collectionState == null || state.priority > 
collectionState.priority) {
-            collectionState = state;
-          }
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        collectionState = State.INTERRUPTED;
-      }
-      success = collectionState == null || collectionState.isSuccess();
-    } finally {
-      String statusValue = success ? STATUS_SUCCESS : STATUS_FAILURE;
-      rsp.add(STATUS, statusValue);
-      rsp.addToLog(STATUS, statusValue);
-      if (collectionState != null) {
-        rsp.add(ENCRYPTION_STATE, collectionState.value);
-        rsp.addToLog(ENCRYPTION_STATE, collectionState.value);
-      }
-      if (log.isInfoEnabled()) {
-        log.info("Responding encryption distributed state={} success={} for 
keyId={} timeMs={}",
-            (collectionState == null ? null : collectionState.value), success, 
keyId, toMs(elapsedTimeNs(startTimeNs)));
-      }
-    }
-  }
-
-  protected ModifiableSolrParams 
createDistributedRequestParams(SolrQueryRequest req, SolrQueryResponse rsp, 
String keyId) {
-    return new ModifiableSolrParams().set(PARAM_KEY_ID, keyId == null ? 
NO_KEY_ID : keyId);
-  }
-
-  private State sendEncryptionRequestWithRetry(
-      Replica replica,
+  private RequestStatus handleRequestLocallyInner(
       SolrQueryRequest req,
-      ModifiableSolrParams params,
-      String keyId) {
-    for (int numAttempts = 0; numAttempts < DISTRIBUTION_MAX_ATTEMPTS; 
numAttempts++) {
-      try {
-        SimpleSolrResponse response = sendEncryptionRequest(replica, req, 
params);
-        Object responseStatus = response.getResponse().get(STATUS);
-        Object responseState = response.getResponse().get(ENCRYPTION_STATE);
-        log.info("Encryption state {} status {} for replica {} keyId {} in {} 
ms", responseStatus, responseState, replica.getName(), keyId, 
response.getElapsedTime());
-        if (responseState != null) {
-          return State.fromValue(responseState.toString());
-        }
-      } catch (SolrServerException | IOException e) {
-        log.error("Error occurred while sending encryption request", e);
-      }
+      SolrQueryResponse rsp,
+      String keyId)
+      throws IOException {
+    SegmentInfos segmentInfos = readLatestCommit(req.getCore());
+    if (segmentInfos.size() == 0) {
+      commitEmptyIndexForEncryption(keyId, segmentInfos, req, rsp);
+      return RequestStatus.COMPLETE;
     }
-    throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed 
encryption request to replica " + replica.getName() + " for keyId " + keyId);
-  }
-
-  private SimpleSolrResponse sendEncryptionRequest(
-      Replica replica,
-      SolrQueryRequest req,
-      ModifiableSolrParams params)
-      throws SolrServerException, IOException {
-    // Use the update-only http client, considering encryption is an update as 
we indeed create new Lucene segments.
-    Http2SolrClient solrClient = 
req.getCoreContainer().getUpdateShardHandler().getUpdateOnlyHttpClient();
-    GenericSolrRequest distributedRequest = new 
GenericSolrRequest(SolrRequest.METHOD.POST, req.getPath(), params);
-    return solrClient.requestWithBaseUrl(replica.getCoreUrl(), null, 
distributedRequest);
+    if (checkEncryptionComplete(req, keyId, segmentInfos)) {
+      return RequestStatus.COMPLETE;
+    }
+    RequestStatus ongoingStatus = checkEncryptionOngoing(keyId, req);
+    if (ongoingStatus != null) {
+      return ongoingStatus;
+    }
+    return encryptAsync(req, rsp, keyId, segmentInfos);
   }
 
   private void commitEmptyIndexForEncryption(@Nullable String keyId,
@@ -476,6 +351,79 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
     }
   }
 
+  private void ensureNonEmptyCommitDataForEmptyCommit(Map<String, String> 
commitData) {
+    if (commitData.isEmpty()) {
+      // Ensure that there is some data in the commit user data so that an 
empty commit
+      // (with no change) is allowed.
+      commitData.put("crypto.cleartext", "true");
+    }
+  }
+
+  private boolean checkEncryptionComplete(
+      SolrQueryRequest req,
+      String keyId,
+      SegmentInfos segmentInfos)
+      throws IOException {
+    boolean encryptionComplete = false;
+    if (isCommitActiveKeyId(keyId, segmentInfos)) {
+      log.debug("Provided keyId={} is the current active key id", keyId);
+      if 
(Boolean.parseBoolean(segmentInfos.getUserData().get(COMMIT_ENCRYPTION_PENDING)))
 {
+        encryptionComplete = areAllSegmentsEncryptedWithKeyId(keyId, 
req.getCore(), segmentInfos)
+            && areAllLogsEncryptedWithKeyId(keyId, req.getCore(), 
segmentInfos);
+        if (encryptionComplete) {
+          commitEncryptionComplete(keyId, segmentInfos, req);
+        }
+      } else {
+        encryptionComplete = true;
+      }
+    }
+    return encryptionComplete;
+  }
+
+  private boolean isCommitActiveKeyId(String keyId, SegmentInfos segmentInfos) 
{
+    String keyRef = getActiveKeyRefFromCommit(segmentInfos.getUserData());
+    String activeKeyId = keyRef == null ? null : getKeyIdFromCommit(keyRef, 
segmentInfos.getUserData());
+    return Objects.equals(keyId, activeKeyId);
+  }
+
+  private boolean areAllSegmentsEncryptedWithKeyId(@Nullable String keyId,
+                                                   SolrCore core,
+                                                   SegmentInfos segmentInfos) 
throws IOException {
+    DirectoryFactory directoryFactory = core.getDirectoryFactory();
+    Directory indexDir = directoryFactory.get(core.getIndexDir(),
+        DirectoryFactory.DirContext.DEFAULT,
+        DirectoryFactory.LOCK_TYPE_NONE);
+    try {
+      EncryptionDirectory dir = (EncryptionDirectory) indexDir;
+      List<SegmentCommitInfo> segmentsWithOldKeyId = 
dir.getSegmentsWithOldKeyId(segmentInfos, keyId);
+      log.debug("Encryption is pending; {} segments do not have keyId={}",
+          segmentsWithOldKeyId.size(), keyId);
+      return segmentsWithOldKeyId.isEmpty();
+    } finally {
+      directoryFactory.release(indexDir);
+    }
+  }
+
+  private boolean areAllLogsEncryptedWithKeyId(String keyId, SolrCore core, 
SegmentInfos segmentInfos)
+      throws IOException {
+    EncryptionUpdateLog updateLog = getEncryptionUpdateLog(core);
+    return updateLog == null || updateLog.areAllLogsEncryptedWithKeyId(keyId, 
segmentInfos.getUserData());
+  }
+
+  private EncryptionUpdateLog getEncryptionUpdateLog(SolrCore core) {
+    UpdateHandler updateHandler = core.getUpdateHandler();
+    if (updateHandler == null) {
+      return null;
+    }
+    if (!(updateHandler.getUpdateLog() instanceof EncryptionUpdateLog)) {
+      throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+          UpdateLog.class.getSimpleName()
+              + " must be configured with an "
+              + EncryptionUpdateLog.class.getSimpleName());
+    }
+    return (EncryptionUpdateLog) updateHandler.getUpdateLog();
+  }
+
   private void commitEncryptionComplete(String keyId,
                                         SegmentInfos segmentInfos,
                                         SolrQueryRequest req) throws 
IOException {
@@ -491,11 +439,41 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
     req.getCore().getUpdateHandler().commit(commitCmd);
   }
 
-  private void ensureNonEmptyCommitDataForEmptyCommit(Map<String, String> 
commitData) {
-    if (commitData.isEmpty()) {
-      // Ensure that there is some data in the commit user data so that an 
empty commit
-      // (with no change) is allowed.
-      commitData.put("crypto.cleartext", "true");
+  private RequestStatus checkEncryptionOngoing(String keyId, SolrQueryRequest 
req) {
+    synchronized (pendingEncryptionLock) {
+      PendingKeyId pendingKeyId = 
pendingEncryptions.get(req.getCore().getName());
+      if (pendingKeyId != null) {
+        if (Objects.equals(pendingKeyId.keyId, keyId)) {
+          log.debug("Ongoing encryption for keyId={}", keyId);
+          return RequestStatus.PENDING;
+        }
+        log.debug("Core busy encrypting for keyId={} different than requested 
keyId={}",
+            pendingKeyId.keyId, keyId);
+        return RequestStatus.BUSY;
+      }
+      pendingEncryptions.put(req.getCore().getName(), new PendingKeyId(keyId));
+    }
+    return null;
+  }
+
+  private RequestStatus encryptAsync(
+      SolrQueryRequest req,
+      SolrQueryResponse rsp,
+      String keyId,
+      SegmentInfos segmentInfos)
+      throws IOException {
+    boolean success = false;
+    try {
+      commitEncryptionStart(keyId, segmentInfos, req, rsp);
+      encryptAsyncNoCommit(req);
+      success = true;
+      return RequestStatus.PENDING;
+    } finally {
+      if (!success) {
+        synchronized (pendingEncryptionLock) {
+          pendingEncryptions.remove(req.getCore().getName());
+        }
+      }
     }
   }
 
@@ -511,7 +489,7 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
     req.getCore().getUpdateHandler().commit(commitCmd);
   }
 
-  private void encryptAsync(SolrQueryRequest req) {
+  private void encryptAsyncNoCommit(SolrQueryRequest req) {
     log.debug("Submitting async encryption");
     executor.submit(() -> {
       try {
@@ -521,7 +499,7 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
           long startTimeNs = getTimeSource().getTimeNs();
           boolean logEncryptionComplete = updateLog.encryptLogs();
           log.info("{} encrypted the update log in {} ms",
-                  logEncryptionComplete ? "Successfully" : "Partially", 
toMs(elapsedTimeNs(startTimeNs)));
+              logEncryptionComplete ? "Successfully" : "Partially", 
toMs(elapsedTimeNs(startTimeNs)));
           // If the logs encryption is not complete, it means some logs are 
currently in use.
           // The encryption will be automatically be retried after the next 
commit which should
           // release the old transaction log and make it ready for encryption.
@@ -547,48 +525,146 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
     });
   }
 
-  private boolean areAllSegmentsEncryptedWithKeyId(@Nullable String keyId,
-                                                   SolrCore core,
-                                                   SegmentInfos segmentInfos) 
throws IOException {
-    DirectoryFactory directoryFactory = core.getDirectoryFactory();
-    Directory indexDir = directoryFactory.get(core.getIndexDir(),
-                                              
DirectoryFactory.DirContext.DEFAULT,
-                                              DirectoryFactory.LOCK_TYPE_NONE);
+  private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, 
String keyId) {
+    long startTimeNs = getTimeSource().getTimeNs();
+    RequestStatus collectionStatus = RequestStatus.ERROR;
     try {
-      EncryptionDirectory dir = (EncryptionDirectory) indexDir;
-      List<SegmentCommitInfo> segmentsWithOldKeyId = 
dir.getSegmentsWithOldKeyId(segmentInfos, keyId);
-      log.debug("Encryption is pending; {} segments do not have keyId={}",
-                segmentsWithOldKeyId.size(), keyId);
-      return segmentsWithOldKeyId.isEmpty();
+      collectionStatus = distributeRequestInner(req, rsp, keyId);
     } finally {
-      directoryFactory.release(indexDir);
+      String statusValue = collectionStatus.success ? STATUS_SUCCESS : 
STATUS_FAILURE;
+      rsp.add(STATUS, statusValue);
+      rsp.addToLog(STATUS, statusValue);
+      rsp.add(ENCRYPTION_STATE, collectionStatus.state.value);
+      rsp.addToLog(ENCRYPTION_STATE, collectionStatus.state.value);
+      if (log.isInfoEnabled()) {
+        log.info("Responding encryption distributed state={} success={} for 
keyId={} timeMs={}",
+            collectionStatus.state.value, collectionStatus.success, keyId, 
toMs(elapsedTimeNs(startTimeNs)));
+      }
     }
   }
 
-  private boolean areAllLogsEncryptedWithKeyId(String keyId, SolrCore core, 
SegmentInfos segmentInfos)
-    throws IOException {
-    EncryptionUpdateLog updateLog = getEncryptionUpdateLog(core);
-    return updateLog == null || updateLog.areAllLogsEncryptedWithKeyId(keyId, 
segmentInfos.getUserData());
+  private RequestStatus distributeRequestInner(
+      SolrQueryRequest req,
+      SolrQueryResponse rsp,
+      String keyId) {
+    log.debug("Encrypt request distributed for keyId={}", keyId);
+    TimeOut timeOut = getTimeOut(req);
+    StateHolder collectionState = new StateHolder();
+    Collection<Callable<State>> encryptRequests = 
prepareEncryptionRequests(req, rsp, keyId, collectionState);
+    sendEncryptionRequests(encryptRequests, timeOut, keyId, collectionState);
+    return collectionState.state == null ?
+        RequestStatus.COMPLETE
+        : new RequestStatus(collectionState.state, 
collectionState.state.isSuccess());
   }
 
-  private EncryptionUpdateLog getEncryptionUpdateLog(SolrCore core) {
-    UpdateHandler updateHandler = core.getUpdateHandler();
-    if (updateHandler == null) {
-      return null;
+  private TimeOut getTimeOut(SolrQueryRequest req) {
+    long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0);
+    return timeAllowedMs <= 0 ?
+        null
+        : new TimeOut(timeAllowedMs, TimeUnit.MILLISECONDS, getTimeSource());
+  }
+
+  private Collection<Callable<State>> prepareEncryptionRequests(
+      SolrQueryRequest req,
+      SolrQueryResponse rsp,
+      String keyId,
+      StateHolder collectionState) {
+    DocCollection docCollection = getCollection(req);
+    ModifiableSolrParams params = createDistributedRequestParams(req, rsp, 
keyId);
+    Collection<Slice> slices = docCollection.getActiveSlices();
+    Collection<Callable<State>> encryptRequests = new 
ArrayList<>(slices.size());
+    for (Slice slice : slices) {
+      Replica leader = slice.getLeader();
+      if (leader == null) {
+        log.error("No leader found for shard {}", slice.getName());
+        collectionState.state = State.ERROR;
+      } else {
+        encryptRequests.add(() -> sendEncryptionRequestWithRetry(leader, req, 
params, keyId));
+      }
     }
-    if (!(updateHandler.getUpdateLog() instanceof EncryptionUpdateLog)) {
-      throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
-                              UpdateLog.class.getSimpleName()
-                                + " must be configured with an "
-                                + EncryptionUpdateLog.class.getSimpleName());
+    return encryptRequests;
+  }
+
+  private static DocCollection getCollection(SolrQueryRequest req) {
+    String collectionName = 
req.getCore().getCoreDescriptor().getCollectionName();
+    if (collectionName == null) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter 
" + DISTRIB + " can only be used in Solr Cloud mode.");
     }
-    return (EncryptionUpdateLog) updateHandler.getUpdateLog();
+    DocCollection collection = 
req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName);
+    if (collection == null) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter 
" + DISTRIB + " present but collection '" + collectionName + "' not found.");
+    }
+    return collection;
   }
 
-  private boolean isCommitActiveKeyId(String keyId, SegmentInfos segmentInfos) 
{
-    String keyRef = getActiveKeyRefFromCommit(segmentInfos.getUserData());
-    String activeKeyId = keyRef == null ? null : getKeyIdFromCommit(keyRef, 
segmentInfos.getUserData());
-    return Objects.equals(keyId, activeKeyId);
+  private void sendEncryptionRequests(
+      Collection<Callable<State>> encryptRequests,
+      TimeOut timeOut,
+      String keyId,
+      StateHolder collectionState) {
+    try {
+      List<Future<State>> responses = timeOut == null ?
+          executor.invokeAll(encryptRequests)
+          : executor.invokeAll(encryptRequests, 
timeOut.timeLeft(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+      for (Future<State> response : responses) {
+        State state;
+        try {
+          state = response.get();
+        } catch (ExecutionException e) {
+          log.error("Error distributing encryption request for keyId={}", 
keyId, e);
+          collectionState.state = State.ERROR;
+          break;
+        } catch (CancellationException e) {
+          log.warn("Cancelled distributing encryption request for keyId={}", 
keyId, e);
+          if (collectionState.state == null || State.TIMEOUT.priority > 
collectionState.state.priority) {
+            collectionState.state = State.TIMEOUT;
+          }
+          break;
+        }
+        if (collectionState.state == null || state.priority > 
collectionState.state.priority) {
+          collectionState.state = state;
+        }
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      collectionState.state = State.INTERRUPTED;
+    }
+  }
+
+  protected ModifiableSolrParams 
createDistributedRequestParams(SolrQueryRequest req, SolrQueryResponse rsp, 
String keyId) {
+    return new ModifiableSolrParams().set(PARAM_KEY_ID, keyId == null ? 
NO_KEY_ID : keyId);
+  }
+
+  private State sendEncryptionRequestWithRetry(
+      Replica replica,
+      SolrQueryRequest req,
+      ModifiableSolrParams params,
+      String keyId) {
+    for (int numAttempts = 0; numAttempts < DISTRIBUTION_MAX_ATTEMPTS; 
numAttempts++) {
+      try {
+        SimpleSolrResponse response = sendEncryptionRequest(replica, req, 
params);
+        Object responseStatus = response.getResponse().get(STATUS);
+        Object responseState = response.getResponse().get(ENCRYPTION_STATE);
+        log.info("Encryption state {} status {} for replica {} keyId {} in {} 
ms", responseStatus, responseState, replica.getName(), keyId, 
response.getElapsedTime());
+        if (responseState != null) {
+          return State.fromValue(responseState.toString());
+        }
+      } catch (SolrServerException | IOException e) {
+        log.error("Error occurred while sending encryption request", e);
+      }
+    }
+    throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed 
encryption request to replica " + replica.getName() + " for keyId " + keyId);
+  }
+
+  private SimpleSolrResponse sendEncryptionRequest(
+      Replica replica,
+      SolrQueryRequest req,
+      ModifiableSolrParams params)
+      throws SolrServerException, IOException {
+    // Use the update-only http client, considering encryption is an update as 
we indeed create new Lucene segments.
+    Http2SolrClient solrClient = 
req.getCoreContainer().getUpdateShardHandler().getUpdateOnlyHttpClient();
+    GenericSolrRequest distributedRequest = new 
GenericSolrRequest(SolrRequest.METHOD.POST, req.getPath(), params);
+    return solrClient.requestWithBaseUrl(replica.getCoreUrl(), null, 
distributedRequest);
   }
 
   private long elapsedTimeNs(long startTimeNs) {
@@ -615,4 +691,30 @@ public class EncryptionRequestHandler extends 
RequestHandlerBase {
       this.keyId = keyId;
     }
   }
+
+  /**
+   * Internal encryption request status.
+   */
+  private static class RequestStatus {
+
+    static final RequestStatus ERROR = new RequestStatus(State.PENDING, false);
+    static final RequestStatus COMPLETE = new RequestStatus(State.COMPLETE, 
true);
+    static final RequestStatus PENDING = new RequestStatus(State.PENDING, 
true);
+    static final RequestStatus BUSY = new RequestStatus(State.BUSY, false);
+
+    final State state;
+    final boolean success;
+
+    RequestStatus(State state, boolean success) {
+      this.state = state;
+      this.success = success;
+    }
+  }
+
+  /**
+   * Holds a mutable {@link State}.
+   */
+  private static class StateHolder {
+    State state;
+  }
 }

Reply via email to