This is an automated email from the ASF dual-hosted git repository.
broustant pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new 5266383 Refactor EncryptionRequestHandler to split large methods.
(#120)
5266383 is described below
commit 526638363c43f323b2c89f76a58fff445c5e8e30
Author: Bruno Roustant <[email protected]>
AuthorDate: Tue Aug 5 16:07:41 2025 +0200
Refactor EncryptionRequestHandler to split large methods. (#120)
---
.../solr/encryption/EncryptionRequestHandler.java | 516 ++++++++++++---------
1 file changed, 309 insertions(+), 207 deletions(-)
diff --git
a/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java
b/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java
index a124e46..d1ada0a 100644
---
a/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java
+++
b/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java
@@ -258,196 +258,71 @@ public class EncryptionRequestHandler extends
RequestHandlerBase {
@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)
throws Exception {
- long startTimeNs = getTimeSource().getTimeNs();
+ String keyId = getRequestKeyId(req, rsp);
+ checkEncryptionDirectoryFactory(req);
+ if (req.getParams().getBool(DISTRIB, false)) {
+ distributeRequest(req, rsp, keyId);
+ } else {
+ localRequest(req, rsp, keyId);
+ }
+ }
+
+ private static String getRequestKeyId(SolrQueryRequest req,
SolrQueryResponse rsp) throws IOException {
String keyId = req.getParams().get(PARAM_KEY_ID);
if (keyId == null || keyId.isEmpty()) {
rsp.add(STATUS, STATUS_FAILURE);
throw new IOException("Parameter " + PARAM_KEY_ID + " must be present
and not empty."
- + " Use [" + PARAM_KEY_ID + "=\"" + NO_KEY_ID +
"\"] for explicit decryption.");
+ + " Use [" + PARAM_KEY_ID + "=\"" + NO_KEY_ID + "\"] for explicit
decryption.");
} else if (keyId.equals(NO_KEY_ID)) {
keyId = null;
}
+ return keyId;
+ }
+
+ private static void checkEncryptionDirectoryFactory(SolrQueryRequest req) {
// Check the defined DirectoryFactory instance.
EncryptionDirectoryFactory.getFactory(req.getCore());
+ }
- if (req.getParams().getBool(DISTRIB, false)) {
- distributeRequest(req, rsp, keyId, startTimeNs);
- return;
- }
-
+ private void localRequest(
+ SolrQueryRequest req,
+ SolrQueryResponse rsp,
+ String keyId)
+ throws IOException {
+ long startTimeNs = getTimeSource().getTimeNs();
log.debug("Encrypt request for keyId={}", keyId);
- boolean success = false;
- State state = State.PENDING;
+ RequestStatus requestStatus = RequestStatus.ERROR;
try {
- SegmentInfos segmentInfos = readLatestCommit(req.getCore());
- if (segmentInfos.size() == 0) {
- commitEmptyIndexForEncryption(keyId, segmentInfos, req, rsp);
- state = State.COMPLETE;
- success = true;
- return;
- }
-
- boolean encryptionComplete = false;
- if (isCommitActiveKeyId(keyId, segmentInfos)) {
- log.debug("Provided keyId={} is the current active key id", keyId);
- if
(Boolean.parseBoolean(segmentInfos.getUserData().get(COMMIT_ENCRYPTION_PENDING)))
{
- encryptionComplete = areAllSegmentsEncryptedWithKeyId(keyId,
req.getCore(), segmentInfos)
- && areAllLogsEncryptedWithKeyId(keyId, req.getCore(),
segmentInfos);
- if (encryptionComplete) {
- commitEncryptionComplete(keyId, segmentInfos, req);
- }
- } else {
- encryptionComplete = true;
- }
- }
- if (encryptionComplete) {
- state = State.COMPLETE;
- success = true;
- return;
- }
-
- synchronized (pendingEncryptionLock) {
- PendingKeyId pendingKeyId =
pendingEncryptions.get(req.getCore().getName());
- if (pendingKeyId != null) {
- if (Objects.equals(pendingKeyId.keyId, keyId)) {
- log.debug("Ongoing encryption for keyId={}", keyId);
- success = true;
- } else {
- log.debug("Core busy encrypting for keyId={} different than
requested keyId={}",
- pendingKeyId.keyId, keyId);
- state = State.BUSY;
- }
- return;
- }
- pendingEncryptions.put(req.getCore().getName(), new
PendingKeyId(keyId));
- }
- try {
- commitEncryptionStart(keyId, segmentInfos, req, rsp);
- encryptAsync(req);
- success = true;
- } finally {
- if (!success) {
- synchronized (pendingEncryptionLock) {
- pendingEncryptions.remove(req.getCore().getName());
- }
- }
- }
-
+ requestStatus = handleRequestLocallyInner(req, rsp, keyId);
} finally {
- String statusValue = success ? STATUS_SUCCESS : STATUS_FAILURE;
+ String statusValue = requestStatus.success ? STATUS_SUCCESS :
STATUS_FAILURE;
rsp.add(STATUS, statusValue);
rsp.addToLog(STATUS, statusValue);
- rsp.add(ENCRYPTION_STATE, state.value);
- rsp.addToLog(ENCRYPTION_STATE, state.value);
+ rsp.add(ENCRYPTION_STATE, requestStatus.state.value);
+ rsp.addToLog(ENCRYPTION_STATE, requestStatus.state.value);
log.info("Responding encryption state={} success={} for keyId={}
timeMs={}",
- state.value, success, keyId, toMs(elapsedTimeNs(startTimeNs)));
+ requestStatus.state.value, requestStatus.success, keyId,
toMs(elapsedTimeNs(startTimeNs)));
}
}
- private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp,
String keyId, long startTimeNs) {
- boolean success = false;
- State collectionState = null;
- long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0);
- TimeOut timeOut = timeAllowedMs <= 0 ? null : new TimeOut(timeAllowedMs,
TimeUnit.MILLISECONDS, getTimeSource());
- try {
- String collectionName =
req.getCore().getCoreDescriptor().getCollectionName();
- if (collectionName == null) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Parameter " + DISTRIB + " can only be used in Solr Cloud mode.");
- }
- log.debug("Encrypt request distributed for keyId={}", keyId);
- DocCollection docCollection =
req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName);
- if (docCollection == null) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Parameter " + DISTRIB + " present but collection '" + collectionName + "' not
found.");
- }
- ModifiableSolrParams params = createDistributedRequestParams(req, rsp,
keyId);
- Collection<Slice> slices = docCollection.getActiveSlices();
- Collection<Callable<State>> encryptRequests = new
ArrayList<>(slices.size());
- for (Slice slice : slices) {
- Replica leader = slice.getLeader();
- if (leader == null) {
- log.error("No leader found for shard {}", slice.getName());
- collectionState = State.ERROR;
- continue;
- }
- encryptRequests.add(() -> sendEncryptionRequestWithRetry(leader, req,
params, keyId));
- }
- try {
- List<Future<State>> responses = timeOut == null ?
- executor.invokeAll(encryptRequests)
- : executor.invokeAll(encryptRequests,
timeOut.timeLeft(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
- for (Future<State> response : responses) {
- State state;
- try {
- state = response.get();
- } catch (ExecutionException e) {
- log.error("Error distributing encryption request for keyId={}",
keyId, e);
- collectionState = State.ERROR;
- break;
- } catch (CancellationException e) {
- log.warn("Cancelled distributing encryption request for keyId={}",
keyId, e);
- if (collectionState == null || State.TIMEOUT.priority >
collectionState.priority) {
- collectionState = State.TIMEOUT;
- }
- break;
- }
- if (collectionState == null || state.priority >
collectionState.priority) {
- collectionState = state;
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- collectionState = State.INTERRUPTED;
- }
- success = collectionState == null || collectionState.isSuccess();
- } finally {
- String statusValue = success ? STATUS_SUCCESS : STATUS_FAILURE;
- rsp.add(STATUS, statusValue);
- rsp.addToLog(STATUS, statusValue);
- if (collectionState != null) {
- rsp.add(ENCRYPTION_STATE, collectionState.value);
- rsp.addToLog(ENCRYPTION_STATE, collectionState.value);
- }
- if (log.isInfoEnabled()) {
- log.info("Responding encryption distributed state={} success={} for
keyId={} timeMs={}",
- (collectionState == null ? null : collectionState.value), success,
keyId, toMs(elapsedTimeNs(startTimeNs)));
- }
- }
- }
-
- protected ModifiableSolrParams
createDistributedRequestParams(SolrQueryRequest req, SolrQueryResponse rsp,
String keyId) {
- return new ModifiableSolrParams().set(PARAM_KEY_ID, keyId == null ?
NO_KEY_ID : keyId);
- }
-
- private State sendEncryptionRequestWithRetry(
- Replica replica,
+ private RequestStatus handleRequestLocallyInner(
SolrQueryRequest req,
- ModifiableSolrParams params,
- String keyId) {
- for (int numAttempts = 0; numAttempts < DISTRIBUTION_MAX_ATTEMPTS;
numAttempts++) {
- try {
- SimpleSolrResponse response = sendEncryptionRequest(replica, req,
params);
- Object responseStatus = response.getResponse().get(STATUS);
- Object responseState = response.getResponse().get(ENCRYPTION_STATE);
- log.info("Encryption state {} status {} for replica {} keyId {} in {}
ms", responseStatus, responseState, replica.getName(), keyId,
response.getElapsedTime());
- if (responseState != null) {
- return State.fromValue(responseState.toString());
- }
- } catch (SolrServerException | IOException e) {
- log.error("Error occurred while sending encryption request", e);
- }
+ SolrQueryResponse rsp,
+ String keyId)
+ throws IOException {
+ SegmentInfos segmentInfos = readLatestCommit(req.getCore());
+ if (segmentInfos.size() == 0) {
+ commitEmptyIndexForEncryption(keyId, segmentInfos, req, rsp);
+ return RequestStatus.COMPLETE;
}
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed
encryption request to replica " + replica.getName() + " for keyId " + keyId);
- }
-
- private SimpleSolrResponse sendEncryptionRequest(
- Replica replica,
- SolrQueryRequest req,
- ModifiableSolrParams params)
- throws SolrServerException, IOException {
- // Use the update-only http client, considering encryption is an update as
we indeed create new Lucene segments.
- Http2SolrClient solrClient =
req.getCoreContainer().getUpdateShardHandler().getUpdateOnlyHttpClient();
- GenericSolrRequest distributedRequest = new
GenericSolrRequest(SolrRequest.METHOD.POST, req.getPath(), params);
- return solrClient.requestWithBaseUrl(replica.getCoreUrl(), null,
distributedRequest);
+ if (checkEncryptionComplete(req, keyId, segmentInfos)) {
+ return RequestStatus.COMPLETE;
+ }
+ RequestStatus ongoingStatus = checkEncryptionOngoing(keyId, req);
+ if (ongoingStatus != null) {
+ return ongoingStatus;
+ }
+ return encryptAsync(req, rsp, keyId, segmentInfos);
}
private void commitEmptyIndexForEncryption(@Nullable String keyId,
@@ -476,6 +351,79 @@ public class EncryptionRequestHandler extends
RequestHandlerBase {
}
}
+ private void ensureNonEmptyCommitDataForEmptyCommit(Map<String, String>
commitData) {
+ if (commitData.isEmpty()) {
+ // Ensure that there is some data in the commit user data so that an
empty commit
+ // (with no change) is allowed.
+ commitData.put("crypto.cleartext", "true");
+ }
+ }
+
+ private boolean checkEncryptionComplete(
+ SolrQueryRequest req,
+ String keyId,
+ SegmentInfos segmentInfos)
+ throws IOException {
+ boolean encryptionComplete = false;
+ if (isCommitActiveKeyId(keyId, segmentInfos)) {
+ log.debug("Provided keyId={} is the current active key id", keyId);
+ if
(Boolean.parseBoolean(segmentInfos.getUserData().get(COMMIT_ENCRYPTION_PENDING)))
{
+ encryptionComplete = areAllSegmentsEncryptedWithKeyId(keyId,
req.getCore(), segmentInfos)
+ && areAllLogsEncryptedWithKeyId(keyId, req.getCore(),
segmentInfos);
+ if (encryptionComplete) {
+ commitEncryptionComplete(keyId, segmentInfos, req);
+ }
+ } else {
+ encryptionComplete = true;
+ }
+ }
+ return encryptionComplete;
+ }
+
+ private boolean isCommitActiveKeyId(String keyId, SegmentInfos segmentInfos)
{
+ String keyRef = getActiveKeyRefFromCommit(segmentInfos.getUserData());
+ String activeKeyId = keyRef == null ? null : getKeyIdFromCommit(keyRef,
segmentInfos.getUserData());
+ return Objects.equals(keyId, activeKeyId);
+ }
+
+ private boolean areAllSegmentsEncryptedWithKeyId(@Nullable String keyId,
+ SolrCore core,
+ SegmentInfos segmentInfos)
throws IOException {
+ DirectoryFactory directoryFactory = core.getDirectoryFactory();
+ Directory indexDir = directoryFactory.get(core.getIndexDir(),
+ DirectoryFactory.DirContext.DEFAULT,
+ DirectoryFactory.LOCK_TYPE_NONE);
+ try {
+ EncryptionDirectory dir = (EncryptionDirectory) indexDir;
+ List<SegmentCommitInfo> segmentsWithOldKeyId =
dir.getSegmentsWithOldKeyId(segmentInfos, keyId);
+ log.debug("Encryption is pending; {} segments do not have keyId={}",
+ segmentsWithOldKeyId.size(), keyId);
+ return segmentsWithOldKeyId.isEmpty();
+ } finally {
+ directoryFactory.release(indexDir);
+ }
+ }
+
+ private boolean areAllLogsEncryptedWithKeyId(String keyId, SolrCore core,
SegmentInfos segmentInfos)
+ throws IOException {
+ EncryptionUpdateLog updateLog = getEncryptionUpdateLog(core);
+ return updateLog == null || updateLog.areAllLogsEncryptedWithKeyId(keyId,
segmentInfos.getUserData());
+ }
+
+ private EncryptionUpdateLog getEncryptionUpdateLog(SolrCore core) {
+ UpdateHandler updateHandler = core.getUpdateHandler();
+ if (updateHandler == null) {
+ return null;
+ }
+ if (!(updateHandler.getUpdateLog() instanceof EncryptionUpdateLog)) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ UpdateLog.class.getSimpleName()
+ + " must be configured with an "
+ + EncryptionUpdateLog.class.getSimpleName());
+ }
+ return (EncryptionUpdateLog) updateHandler.getUpdateLog();
+ }
+
private void commitEncryptionComplete(String keyId,
SegmentInfos segmentInfos,
SolrQueryRequest req) throws
IOException {
@@ -491,11 +439,41 @@ public class EncryptionRequestHandler extends
RequestHandlerBase {
req.getCore().getUpdateHandler().commit(commitCmd);
}
- private void ensureNonEmptyCommitDataForEmptyCommit(Map<String, String>
commitData) {
- if (commitData.isEmpty()) {
- // Ensure that there is some data in the commit user data so that an
empty commit
- // (with no change) is allowed.
- commitData.put("crypto.cleartext", "true");
+ private RequestStatus checkEncryptionOngoing(String keyId, SolrQueryRequest
req) {
+ synchronized (pendingEncryptionLock) {
+ PendingKeyId pendingKeyId =
pendingEncryptions.get(req.getCore().getName());
+ if (pendingKeyId != null) {
+ if (Objects.equals(pendingKeyId.keyId, keyId)) {
+ log.debug("Ongoing encryption for keyId={}", keyId);
+ return RequestStatus.PENDING;
+ }
+ log.debug("Core busy encrypting for keyId={} different than requested
keyId={}",
+ pendingKeyId.keyId, keyId);
+ return RequestStatus.BUSY;
+ }
+ pendingEncryptions.put(req.getCore().getName(), new PendingKeyId(keyId));
+ }
+ return null;
+ }
+
+ private RequestStatus encryptAsync(
+ SolrQueryRequest req,
+ SolrQueryResponse rsp,
+ String keyId,
+ SegmentInfos segmentInfos)
+ throws IOException {
+ boolean success = false;
+ try {
+ commitEncryptionStart(keyId, segmentInfos, req, rsp);
+ encryptAsyncNoCommit(req);
+ success = true;
+ return RequestStatus.PENDING;
+ } finally {
+ if (!success) {
+ synchronized (pendingEncryptionLock) {
+ pendingEncryptions.remove(req.getCore().getName());
+ }
+ }
}
}
@@ -511,7 +489,7 @@ public class EncryptionRequestHandler extends
RequestHandlerBase {
req.getCore().getUpdateHandler().commit(commitCmd);
}
- private void encryptAsync(SolrQueryRequest req) {
+ private void encryptAsyncNoCommit(SolrQueryRequest req) {
log.debug("Submitting async encryption");
executor.submit(() -> {
try {
@@ -521,7 +499,7 @@ public class EncryptionRequestHandler extends
RequestHandlerBase {
long startTimeNs = getTimeSource().getTimeNs();
boolean logEncryptionComplete = updateLog.encryptLogs();
log.info("{} encrypted the update log in {} ms",
- logEncryptionComplete ? "Successfully" : "Partially",
toMs(elapsedTimeNs(startTimeNs)));
+ logEncryptionComplete ? "Successfully" : "Partially",
toMs(elapsedTimeNs(startTimeNs)));
// If the logs encryption is not complete, it means some logs are
currently in use.
// The encryption will be automatically be retried after the next
commit which should
// release the old transaction log and make it ready for encryption.
@@ -547,48 +525,146 @@ public class EncryptionRequestHandler extends
RequestHandlerBase {
});
}
- private boolean areAllSegmentsEncryptedWithKeyId(@Nullable String keyId,
- SolrCore core,
- SegmentInfos segmentInfos)
throws IOException {
- DirectoryFactory directoryFactory = core.getDirectoryFactory();
- Directory indexDir = directoryFactory.get(core.getIndexDir(),
-
DirectoryFactory.DirContext.DEFAULT,
- DirectoryFactory.LOCK_TYPE_NONE);
+ private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp,
String keyId) {
+ long startTimeNs = getTimeSource().getTimeNs();
+ RequestStatus collectionStatus = RequestStatus.ERROR;
try {
- EncryptionDirectory dir = (EncryptionDirectory) indexDir;
- List<SegmentCommitInfo> segmentsWithOldKeyId =
dir.getSegmentsWithOldKeyId(segmentInfos, keyId);
- log.debug("Encryption is pending; {} segments do not have keyId={}",
- segmentsWithOldKeyId.size(), keyId);
- return segmentsWithOldKeyId.isEmpty();
+ collectionStatus = distributeRequestInner(req, rsp, keyId);
} finally {
- directoryFactory.release(indexDir);
+ String statusValue = collectionStatus.success ? STATUS_SUCCESS :
STATUS_FAILURE;
+ rsp.add(STATUS, statusValue);
+ rsp.addToLog(STATUS, statusValue);
+ rsp.add(ENCRYPTION_STATE, collectionStatus.state.value);
+ rsp.addToLog(ENCRYPTION_STATE, collectionStatus.state.value);
+ if (log.isInfoEnabled()) {
+ log.info("Responding encryption distributed state={} success={} for
keyId={} timeMs={}",
+ collectionStatus.state.value, collectionStatus.success, keyId,
toMs(elapsedTimeNs(startTimeNs)));
+ }
}
}
- private boolean areAllLogsEncryptedWithKeyId(String keyId, SolrCore core,
SegmentInfos segmentInfos)
- throws IOException {
- EncryptionUpdateLog updateLog = getEncryptionUpdateLog(core);
- return updateLog == null || updateLog.areAllLogsEncryptedWithKeyId(keyId,
segmentInfos.getUserData());
+ private RequestStatus distributeRequestInner(
+ SolrQueryRequest req,
+ SolrQueryResponse rsp,
+ String keyId) {
+ log.debug("Encrypt request distributed for keyId={}", keyId);
+ TimeOut timeOut = getTimeOut(req);
+ StateHolder collectionState = new StateHolder();
+ Collection<Callable<State>> encryptRequests =
prepareEncryptionRequests(req, rsp, keyId, collectionState);
+ sendEncryptionRequests(encryptRequests, timeOut, keyId, collectionState);
+ return collectionState.state == null ?
+ RequestStatus.COMPLETE
+ : new RequestStatus(collectionState.state,
collectionState.state.isSuccess());
}
- private EncryptionUpdateLog getEncryptionUpdateLog(SolrCore core) {
- UpdateHandler updateHandler = core.getUpdateHandler();
- if (updateHandler == null) {
- return null;
+ private TimeOut getTimeOut(SolrQueryRequest req) {
+ long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0);
+ return timeAllowedMs <= 0 ?
+ null
+ : new TimeOut(timeAllowedMs, TimeUnit.MILLISECONDS, getTimeSource());
+ }
+
+ private Collection<Callable<State>> prepareEncryptionRequests(
+ SolrQueryRequest req,
+ SolrQueryResponse rsp,
+ String keyId,
+ StateHolder collectionState) {
+ DocCollection docCollection = getCollection(req);
+ ModifiableSolrParams params = createDistributedRequestParams(req, rsp,
keyId);
+ Collection<Slice> slices = docCollection.getActiveSlices();
+ Collection<Callable<State>> encryptRequests = new
ArrayList<>(slices.size());
+ for (Slice slice : slices) {
+ Replica leader = slice.getLeader();
+ if (leader == null) {
+ log.error("No leader found for shard {}", slice.getName());
+ collectionState.state = State.ERROR;
+ } else {
+ encryptRequests.add(() -> sendEncryptionRequestWithRetry(leader, req,
params, keyId));
+ }
}
- if (!(updateHandler.getUpdateLog() instanceof EncryptionUpdateLog)) {
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
- UpdateLog.class.getSimpleName()
- + " must be configured with an "
- + EncryptionUpdateLog.class.getSimpleName());
+ return encryptRequests;
+ }
+
+ private static DocCollection getCollection(SolrQueryRequest req) {
+ String collectionName =
req.getCore().getCoreDescriptor().getCollectionName();
+ if (collectionName == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter
" + DISTRIB + " can only be used in Solr Cloud mode.");
}
- return (EncryptionUpdateLog) updateHandler.getUpdateLog();
+ DocCollection collection =
req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName);
+ if (collection == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter
" + DISTRIB + " present but collection '" + collectionName + "' not found.");
+ }
+ return collection;
}
- private boolean isCommitActiveKeyId(String keyId, SegmentInfos segmentInfos)
{
- String keyRef = getActiveKeyRefFromCommit(segmentInfos.getUserData());
- String activeKeyId = keyRef == null ? null : getKeyIdFromCommit(keyRef,
segmentInfos.getUserData());
- return Objects.equals(keyId, activeKeyId);
+ private void sendEncryptionRequests(
+ Collection<Callable<State>> encryptRequests,
+ TimeOut timeOut,
+ String keyId,
+ StateHolder collectionState) {
+ try {
+ List<Future<State>> responses = timeOut == null ?
+ executor.invokeAll(encryptRequests)
+ : executor.invokeAll(encryptRequests,
timeOut.timeLeft(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+ for (Future<State> response : responses) {
+ State state;
+ try {
+ state = response.get();
+ } catch (ExecutionException e) {
+ log.error("Error distributing encryption request for keyId={}",
keyId, e);
+ collectionState.state = State.ERROR;
+ break;
+ } catch (CancellationException e) {
+ log.warn("Cancelled distributing encryption request for keyId={}",
keyId, e);
+ if (collectionState.state == null || State.TIMEOUT.priority >
collectionState.state.priority) {
+ collectionState.state = State.TIMEOUT;
+ }
+ break;
+ }
+ if (collectionState.state == null || state.priority >
collectionState.state.priority) {
+ collectionState.state = state;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ collectionState.state = State.INTERRUPTED;
+ }
+ }
+
+ protected ModifiableSolrParams
createDistributedRequestParams(SolrQueryRequest req, SolrQueryResponse rsp,
String keyId) {
+ return new ModifiableSolrParams().set(PARAM_KEY_ID, keyId == null ?
NO_KEY_ID : keyId);
+ }
+
+ private State sendEncryptionRequestWithRetry(
+ Replica replica,
+ SolrQueryRequest req,
+ ModifiableSolrParams params,
+ String keyId) {
+ for (int numAttempts = 0; numAttempts < DISTRIBUTION_MAX_ATTEMPTS;
numAttempts++) {
+ try {
+ SimpleSolrResponse response = sendEncryptionRequest(replica, req,
params);
+ Object responseStatus = response.getResponse().get(STATUS);
+ Object responseState = response.getResponse().get(ENCRYPTION_STATE);
+ log.info("Encryption state {} status {} for replica {} keyId {} in {}
ms", responseStatus, responseState, replica.getName(), keyId,
response.getElapsedTime());
+ if (responseState != null) {
+ return State.fromValue(responseState.toString());
+ }
+ } catch (SolrServerException | IOException e) {
+ log.error("Error occurred while sending encryption request", e);
+ }
+ }
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed
encryption request to replica " + replica.getName() + " for keyId " + keyId);
+ }
+
+ private SimpleSolrResponse sendEncryptionRequest(
+ Replica replica,
+ SolrQueryRequest req,
+ ModifiableSolrParams params)
+ throws SolrServerException, IOException {
+ // Use the update-only http client, considering encryption is an update as
we indeed create new Lucene segments.
+ Http2SolrClient solrClient =
req.getCoreContainer().getUpdateShardHandler().getUpdateOnlyHttpClient();
+ GenericSolrRequest distributedRequest = new
GenericSolrRequest(SolrRequest.METHOD.POST, req.getPath(), params);
+ return solrClient.requestWithBaseUrl(replica.getCoreUrl(), null,
distributedRequest);
}
private long elapsedTimeNs(long startTimeNs) {
@@ -615,4 +691,30 @@ public class EncryptionRequestHandler extends
RequestHandlerBase {
this.keyId = keyId;
}
}
+
+ /**
+ * Internal encryption request status.
+ */
+ private static class RequestStatus {
+
+ static final RequestStatus ERROR = new RequestStatus(State.PENDING, false);
+ static final RequestStatus COMPLETE = new RequestStatus(State.COMPLETE,
true);
+ static final RequestStatus PENDING = new RequestStatus(State.PENDING,
true);
+ static final RequestStatus BUSY = new RequestStatus(State.BUSY, false);
+
+ final State state;
+ final boolean success;
+
+ RequestStatus(State state, boolean success) {
+ this.state = state;
+ this.success = success;
+ }
+ }
+
+ /**
+ * Holds a mutable {@link State}.
+ */
+ private static class StateHolder {
+ State state;
+ }
}