dsmiley commented on code in PR #115: URL: https://github.com/apache/solr-sandbox/pull/115#discussion_r1911643958
########## encryption/src/test/java/org/apache/solr/encryption/EncryptionTestUtil.java: ########## @@ -130,16 +144,32 @@ public EncryptionStatus encrypt(String keyId) throws Exception { params.set(PARAM_KEY_ID, keyId); params.set(PARAM_TENANT_ID, TENANT_ID); params.set(PARAM_ENCRYPTION_KEY_BLOB, generateKeyBlob(keyId)); + if (shouldDistributeEncryptRequest()) { + return encryptDistrib(params); + } GenericSolrRequest encryptRequest = new GenericSolrRequest(SolrRequest.METHOD.GET, "/admin/encrypt", params); EncryptionStatus encryptionStatus = new EncryptionStatus(); - forAllReplicas(replica -> { + forAllReplicas(false, replica -> { NamedList<Object> response = requestCore(encryptRequest, replica); + EncryptionRequestHandler.State state = EncryptionRequestHandler.State.fromValue(response.get(ENCRYPTION_STATE).toString()); encryptionStatus.success &= response.get(STATUS).equals(STATUS_SUCCESS); - encryptionStatus.complete &= response.get(ENCRYPTION_STATE).equals(STATE_COMPLETE); + encryptionStatus.complete &= state == EncryptionRequestHandler.State.COMPLETE; }); return encryptionStatus; } + private EncryptionStatus encryptDistrib(SolrParams params) throws SolrServerException, IOException { + params = SolrParams.wrapDefaults(params, new ModifiableSolrParams().set(DISTRIB, "true")); Review Comment: Nice :-) BTW set's 2nd arg value needn't be a string; it's overloaded to take the primitive boolean. ########## encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java: ########## @@ -260,12 +339,133 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw } else { rsp.add(STATUS, STATUS_FAILURE); } - log.info("Responding encryption state={} success={} for keyId={}", - encryptionState, success, keyId); - rsp.add(ENCRYPTION_STATE, encryptionState); + rsp.add(ENCRYPTION_STATE, state.value); + long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); + log.info("Responding encryption state={} success={} for keyId={} timeMs={}", + state.value, success, keyId, timeMs); } } + private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, String keyId, long startTimeNs) { + boolean success = false; + String collectionName = null; + State collectionState = null; + long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0); + long maxTimeNs = timeAllowedMs <= 0 ? Long.MAX_VALUE : startTimeNs + timeAllowedMs; Review Comment: you are adding milliseconds and nanoseconds. Maybe convert timeAllowsMs on fetch so that you don't have any millisecond vars ########## encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java: ########## @@ -260,12 +339,133 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw } else { rsp.add(STATUS, STATUS_FAILURE); } - log.info("Responding encryption state={} success={} for keyId={}", - encryptionState, success, keyId); - rsp.add(ENCRYPTION_STATE, encryptionState); + rsp.add(ENCRYPTION_STATE, state.value); + long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); + log.info("Responding encryption state={} success={} for keyId={} timeMs={}", + state.value, success, keyId, timeMs); } } + private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, String keyId, long startTimeNs) { + boolean success = false; + String collectionName = null; + State collectionState = null; + long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0); + long maxTimeNs = timeAllowedMs <= 0 ? Long.MAX_VALUE : startTimeNs + timeAllowedMs; + try { + collectionName = req.getCore().getCoreDescriptor().getCollectionName(); + if (collectionName == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " can only be used in Solr Cloud mode."); + } + log.debug("Encrypt request distributed for keyId={} collection={}", keyId, collectionName); + DocCollection docCollection = req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName); + if (docCollection == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " present but collection '" + collectionName + "' not found."); + } + try (SolrClientHolder solrClient = getHttpSolrClient(req)) { + ModifiableSolrParams params = createDistributedRequestParams(req, rsp, keyId); + Collection<Slice> slices = docCollection.getActiveSlices(); + Collection<Callable<State>> encryptRequests = new ArrayList<>(slices.size()); + final String collectionNameFinal = collectionName; + for (Slice slice : slices) { + Replica replica = slice.getLeader(); + if (replica == null) { + log.error("No leader found for shard {}", slice.getName()); + collectionState = State.ERROR; + continue; + } + encryptRequests.add(() -> sendEncryptionRequestWithRetry(replica, params, solrClient.getClient(), keyId, collectionNameFinal)); + } + try { + List<Future<State>> responses = timeAllowedMs <= 0 ? + executor.invokeAll(encryptRequests) + : executor.invokeAll(encryptRequests, timeAllowedMs, TimeUnit.MILLISECONDS); + if (isTimeout(maxTimeNs)) { + log.warn("Timeout distributing encryption request for keyId={} collection={}", keyId, collectionName); + if (collectionState == null || State.TIMEOUT.priority > collectionState.priority) { + collectionState = State.TIMEOUT; + } + } + for (Future<State> response : responses) { + State state; + try { + state = response.get(); + } catch (ExecutionException e) { + collectionState = State.ERROR; + break; + } + if (collectionState == null || state.priority > collectionState.priority) { + collectionState = state; + } + } + } catch (InterruptedException e) { + collectionState = State.INTERRUPTED; + } + success = collectionState == null || collectionState.isSuccess(); + } + } finally { + String statusValue = success ? STATUS_SUCCESS : STATUS_FAILURE; + rsp.add(STATUS, statusValue); + rsp.addToLog(STATUS, statusValue); + if (collectionState != null) { + rsp.add(ENCRYPTION_STATE, collectionState.value); + rsp.addToLog(ENCRYPTION_STATE, collectionState.value); + } + if (log.isInfoEnabled()) { + long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); + log.info("Responding encryption distributed state={} success={} for keyId={} collection={} timeMs={}", + (collectionState == null ? null : collectionState.value), success, keyId, collectionName, timeMs); + } + } + } + + private SolrClientHolder getHttpSolrClient(SolrQueryRequest req) { + CoreContainer coreContainer = req.getCore().getCoreContainer(); + CloudSolrClient cloudSolrClient = coreContainer.getSolrClientCache().getCloudSolrClient(coreContainer.getZkController().getZkClient().getZkServerAddress()); + if (cloudSolrClient instanceof CloudHttp2SolrClient) { + return new SolrClientHolder(((CloudHttp2SolrClient) cloudSolrClient).getHttpClient(), false); + } + return new SolrClientHolder(new Http2SolrClient.Builder().build(), true); Review Comment: I don't think this is needed, nor is SolrClientHolder. Just use an existing client. You can use `coreContainer.getUpdateShardHandler().getUpdateOnlyHttpClient()`. It's mildly debatable if it's the perfect client since an encryption request is sorta kinda an "update" but it's really not a big deal. You shouldn't create new clients (which was your fallback here) as it's tricky to do it correctly inside Solr as there are some instrumentation & authentication matters to attend to that UpdateShardHandler works out for you. ########## encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java: ########## @@ -260,12 +339,133 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw } else { rsp.add(STATUS, STATUS_FAILURE); } - log.info("Responding encryption state={} success={} for keyId={}", - encryptionState, success, keyId); - rsp.add(ENCRYPTION_STATE, encryptionState); + rsp.add(ENCRYPTION_STATE, state.value); + long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); + log.info("Responding encryption state={} success={} for keyId={} timeMs={}", + state.value, success, keyId, timeMs); } } + private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, String keyId, long startTimeNs) { + boolean success = false; + String collectionName = null; + State collectionState = null; + long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0); + long maxTimeNs = timeAllowedMs <= 0 ? Long.MAX_VALUE : startTimeNs + timeAllowedMs; + try { + collectionName = req.getCore().getCoreDescriptor().getCollectionName(); + if (collectionName == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " can only be used in Solr Cloud mode."); + } + log.debug("Encrypt request distributed for keyId={} collection={}", keyId, collectionName); + DocCollection docCollection = req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName); + if (docCollection == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " present but collection '" + collectionName + "' not found."); + } + try (SolrClientHolder solrClient = getHttpSolrClient(req)) { + ModifiableSolrParams params = createDistributedRequestParams(req, rsp, keyId); + Collection<Slice> slices = docCollection.getActiveSlices(); + Collection<Callable<State>> encryptRequests = new ArrayList<>(slices.size()); + final String collectionNameFinal = collectionName; + for (Slice slice : slices) { + Replica replica = slice.getLeader(); + if (replica == null) { + log.error("No leader found for shard {}", slice.getName()); + collectionState = State.ERROR; + continue; + } + encryptRequests.add(() -> sendEncryptionRequestWithRetry(replica, params, solrClient.getClient(), keyId, collectionNameFinal)); + } + try { + List<Future<State>> responses = timeAllowedMs <= 0 ? + executor.invokeAll(encryptRequests) + : executor.invokeAll(encryptRequests, timeAllowedMs, TimeUnit.MILLISECONDS); + if (isTimeout(maxTimeNs)) { + log.warn("Timeout distributing encryption request for keyId={} collection={}", keyId, collectionName); + if (collectionState == null || State.TIMEOUT.priority > collectionState.priority) { + collectionState = State.TIMEOUT; + } + } + for (Future<State> response : responses) { + State state; + try { + state = response.get(); + } catch (ExecutionException e) { + collectionState = State.ERROR; + break; + } + if (collectionState == null || state.priority > collectionState.priority) { + collectionState = state; + } + } + } catch (InterruptedException e) { + collectionState = State.INTERRUPTED; + } + success = collectionState == null || collectionState.isSuccess(); + } + } finally { + String statusValue = success ? STATUS_SUCCESS : STATUS_FAILURE; + rsp.add(STATUS, statusValue); + rsp.addToLog(STATUS, statusValue); + if (collectionState != null) { + rsp.add(ENCRYPTION_STATE, collectionState.value); + rsp.addToLog(ENCRYPTION_STATE, collectionState.value); + } + if (log.isInfoEnabled()) { + long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); + log.info("Responding encryption distributed state={} success={} for keyId={} collection={} timeMs={}", + (collectionState == null ? null : collectionState.value), success, keyId, collectionName, timeMs); + } + } + } + + private SolrClientHolder getHttpSolrClient(SolrQueryRequest req) { + CoreContainer coreContainer = req.getCore().getCoreContainer(); + CloudSolrClient cloudSolrClient = coreContainer.getSolrClientCache().getCloudSolrClient(coreContainer.getZkController().getZkClient().getZkServerAddress()); + if (cloudSolrClient instanceof CloudHttp2SolrClient) { + return new SolrClientHolder(((CloudHttp2SolrClient) cloudSolrClient).getHttpClient(), false); + } + return new SolrClientHolder(new Http2SolrClient.Builder().build(), true); + } + + protected ModifiableSolrParams createDistributedRequestParams(SolrQueryRequest req, SolrQueryResponse rsp, String keyId) { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(PARAM_KEY_ID, keyId == null ? NO_KEY_ID : keyId); + return params; Review Comment: minor: this can be exactly one line since `set` returns itself ########## encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java: ########## @@ -260,12 +339,133 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw } else { rsp.add(STATUS, STATUS_FAILURE); } - log.info("Responding encryption state={} success={} for keyId={}", - encryptionState, success, keyId); - rsp.add(ENCRYPTION_STATE, encryptionState); + rsp.add(ENCRYPTION_STATE, state.value); + long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); + log.info("Responding encryption state={} success={} for keyId={} timeMs={}", + state.value, success, keyId, timeMs); } } + private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, String keyId, long startTimeNs) { + boolean success = false; + String collectionName = null; + State collectionState = null; + long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0); + long maxTimeNs = timeAllowedMs <= 0 ? Long.MAX_VALUE : startTimeNs + timeAllowedMs; + try { + collectionName = req.getCore().getCoreDescriptor().getCollectionName(); + if (collectionName == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " can only be used in Solr Cloud mode."); + } + log.debug("Encrypt request distributed for keyId={} collection={}", keyId, collectionName); + DocCollection docCollection = req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName); + if (docCollection == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " present but collection '" + collectionName + "' not found."); + } + try (SolrClientHolder solrClient = getHttpSolrClient(req)) { + ModifiableSolrParams params = createDistributedRequestParams(req, rsp, keyId); + Collection<Slice> slices = docCollection.getActiveSlices(); + Collection<Callable<State>> encryptRequests = new ArrayList<>(slices.size()); + final String collectionNameFinal = collectionName; + for (Slice slice : slices) { + Replica replica = slice.getLeader(); + if (replica == null) { + log.error("No leader found for shard {}", slice.getName()); + collectionState = State.ERROR; + continue; + } + encryptRequests.add(() -> sendEncryptionRequestWithRetry(replica, params, solrClient.getClient(), keyId, collectionNameFinal)); + } + try { + List<Future<State>> responses = timeAllowedMs <= 0 ? + executor.invokeAll(encryptRequests) + : executor.invokeAll(encryptRequests, timeAllowedMs, TimeUnit.MILLISECONDS); + if (isTimeout(maxTimeNs)) { + log.warn("Timeout distributing encryption request for keyId={} collection={}", keyId, collectionName); + if (collectionState == null || State.TIMEOUT.priority > collectionState.priority) { + collectionState = State.TIMEOUT; + } + } + for (Future<State> response : responses) { + State state; + try { + state = response.get(); + } catch (ExecutionException e) { + collectionState = State.ERROR; Review Comment: should log.error(e) ########## encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java: ########## @@ -260,12 +339,133 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw } else { rsp.add(STATUS, STATUS_FAILURE); } - log.info("Responding encryption state={} success={} for keyId={}", - encryptionState, success, keyId); - rsp.add(ENCRYPTION_STATE, encryptionState); + rsp.add(ENCRYPTION_STATE, state.value); + long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); + log.info("Responding encryption state={} success={} for keyId={} timeMs={}", + state.value, success, keyId, timeMs); } } + private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, String keyId, long startTimeNs) { + boolean success = false; + String collectionName = null; + State collectionState = null; + long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0); + long maxTimeNs = timeAllowedMs <= 0 ? Long.MAX_VALUE : startTimeNs + timeAllowedMs; + try { + collectionName = req.getCore().getCoreDescriptor().getCollectionName(); + if (collectionName == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " can only be used in Solr Cloud mode."); + } + log.debug("Encrypt request distributed for keyId={} collection={}", keyId, collectionName); + DocCollection docCollection = req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName); + if (docCollection == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " present but collection '" + collectionName + "' not found."); + } + try (SolrClientHolder solrClient = getHttpSolrClient(req)) { + ModifiableSolrParams params = createDistributedRequestParams(req, rsp, keyId); + Collection<Slice> slices = docCollection.getActiveSlices(); + Collection<Callable<State>> encryptRequests = new ArrayList<>(slices.size()); + final String collectionNameFinal = collectionName; + for (Slice slice : slices) { + Replica replica = slice.getLeader(); + if (replica == null) { + log.error("No leader found for shard {}", slice.getName()); + collectionState = State.ERROR; + continue; + } + encryptRequests.add(() -> sendEncryptionRequestWithRetry(replica, params, solrClient.getClient(), keyId, collectionNameFinal)); + } + try { + List<Future<State>> responses = timeAllowedMs <= 0 ? + executor.invokeAll(encryptRequests) + : executor.invokeAll(encryptRequests, timeAllowedMs, TimeUnit.MILLISECONDS); + if (isTimeout(maxTimeNs)) { + log.warn("Timeout distributing encryption request for keyId={} collection={}", keyId, collectionName); + if (collectionState == null || State.TIMEOUT.priority > collectionState.priority) { + collectionState = State.TIMEOUT; + } + } + for (Future<State> response : responses) { + State state; + try { + state = response.get(); + } catch (ExecutionException e) { + collectionState = State.ERROR; + break; + } + if (collectionState == null || state.priority > collectionState.priority) { + collectionState = state; + } + } + } catch (InterruptedException e) { + collectionState = State.INTERRUPTED; + } + success = collectionState == null || collectionState.isSuccess(); + } + } finally { + String statusValue = success ? STATUS_SUCCESS : STATUS_FAILURE; + rsp.add(STATUS, statusValue); + rsp.addToLog(STATUS, statusValue); + if (collectionState != null) { + rsp.add(ENCRYPTION_STATE, collectionState.value); + rsp.addToLog(ENCRYPTION_STATE, collectionState.value); + } + if (log.isInfoEnabled()) { + long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); + log.info("Responding encryption distributed state={} success={} for keyId={} collection={} timeMs={}", + (collectionState == null ? null : collectionState.value), success, keyId, collectionName, timeMs); + } + } + } + + private SolrClientHolder getHttpSolrClient(SolrQueryRequest req) { + CoreContainer coreContainer = req.getCore().getCoreContainer(); + CloudSolrClient cloudSolrClient = coreContainer.getSolrClientCache().getCloudSolrClient(coreContainer.getZkController().getZkClient().getZkServerAddress()); + if (cloudSolrClient instanceof CloudHttp2SolrClient) { + return new SolrClientHolder(((CloudHttp2SolrClient) cloudSolrClient).getHttpClient(), false); + } + return new SolrClientHolder(new Http2SolrClient.Builder().build(), true); + } + + protected ModifiableSolrParams createDistributedRequestParams(SolrQueryRequest req, SolrQueryResponse rsp, String keyId) { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(PARAM_KEY_ID, keyId == null ? NO_KEY_ID : keyId); + return params; + } + + boolean isTimeout(long maxTimeNs) { Review Comment: this method named confused me a little. If you name it "hasTimedOut", it'd be a bit clearer. Also "maxTimeNs" reads looks like it's probably a duration, but it's an instant in time. "timeoutAtNs" would be clearer. ########## encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java: ########## @@ -260,12 +339,133 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw } else { rsp.add(STATUS, STATUS_FAILURE); } - log.info("Responding encryption state={} success={} for keyId={}", - encryptionState, success, keyId); - rsp.add(ENCRYPTION_STATE, encryptionState); + rsp.add(ENCRYPTION_STATE, state.value); + long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); + log.info("Responding encryption state={} success={} for keyId={} timeMs={}", + state.value, success, keyId, timeMs); } } + private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, String keyId, long startTimeNs) { + boolean success = false; + String collectionName = null; + State collectionState = null; + long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0); + long maxTimeNs = timeAllowedMs <= 0 ? Long.MAX_VALUE : startTimeNs + timeAllowedMs; + try { + collectionName = req.getCore().getCoreDescriptor().getCollectionName(); + if (collectionName == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " can only be used in Solr Cloud mode."); + } + log.debug("Encrypt request distributed for keyId={} collection={}", keyId, collectionName); + DocCollection docCollection = req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName); + if (docCollection == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " present but collection '" + collectionName + "' not found."); + } + try (SolrClientHolder solrClient = getHttpSolrClient(req)) { + ModifiableSolrParams params = createDistributedRequestParams(req, rsp, keyId); + Collection<Slice> slices = docCollection.getActiveSlices(); + Collection<Callable<State>> encryptRequests = new ArrayList<>(slices.size()); + final String collectionNameFinal = collectionName; + for (Slice slice : slices) { + Replica replica = slice.getLeader(); + if (replica == null) { + log.error("No leader found for shard {}", slice.getName()); + collectionState = State.ERROR; + continue; + } + encryptRequests.add(() -> sendEncryptionRequestWithRetry(replica, params, solrClient.getClient(), keyId, collectionNameFinal)); + } + try { + List<Future<State>> responses = timeAllowedMs <= 0 ? + executor.invokeAll(encryptRequests) + : executor.invokeAll(encryptRequests, timeAllowedMs, TimeUnit.MILLISECONDS); + if (isTimeout(maxTimeNs)) { + log.warn("Timeout distributing encryption request for keyId={} collection={}", keyId, collectionName); + if (collectionState == null || State.TIMEOUT.priority > collectionState.priority) { + collectionState = State.TIMEOUT; + } + } + for (Future<State> response : responses) { + State state; + try { + state = response.get(); + } catch (ExecutionException e) { + collectionState = State.ERROR; + break; + } + if (collectionState == null || state.priority > collectionState.priority) { + collectionState = state; + } + } + } catch (InterruptedException e) { + collectionState = State.INTERRUPTED; + } + success = collectionState == null || collectionState.isSuccess(); Review Comment: why is null a success? ########## encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java: ########## @@ -260,12 +339,133 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw } else { rsp.add(STATUS, STATUS_FAILURE); } - log.info("Responding encryption state={} success={} for keyId={}", - encryptionState, success, keyId); - rsp.add(ENCRYPTION_STATE, encryptionState); + rsp.add(ENCRYPTION_STATE, state.value); + long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); + log.info("Responding encryption state={} success={} for keyId={} timeMs={}", + state.value, success, keyId, timeMs); } } + private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, String keyId, long startTimeNs) { + boolean success = false; + String collectionName = null; + State collectionState = null; + long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0); + long maxTimeNs = timeAllowedMs <= 0 ? Long.MAX_VALUE : startTimeNs + timeAllowedMs; + try { + collectionName = req.getCore().getCoreDescriptor().getCollectionName(); + if (collectionName == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " can only be used in Solr Cloud mode."); + } + log.debug("Encrypt request distributed for keyId={} collection={}", keyId, collectionName); + DocCollection docCollection = req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName); + if (docCollection == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " present but collection '" + collectionName + "' not found."); + } + try (SolrClientHolder solrClient = getHttpSolrClient(req)) { + ModifiableSolrParams params = createDistributedRequestParams(req, rsp, keyId); + Collection<Slice> slices = docCollection.getActiveSlices(); + Collection<Callable<State>> encryptRequests = new ArrayList<>(slices.size()); + final String collectionNameFinal = collectionName; + for (Slice slice : slices) { + Replica replica = slice.getLeader(); + if (replica == null) { + log.error("No leader found for shard {}", slice.getName()); + collectionState = State.ERROR; + continue; + } + encryptRequests.add(() -> sendEncryptionRequestWithRetry(replica, params, solrClient.getClient(), keyId, collectionNameFinal)); + } + try { + List<Future<State>> responses = timeAllowedMs <= 0 ? + executor.invokeAll(encryptRequests) + : executor.invokeAll(encryptRequests, timeAllowedMs, TimeUnit.MILLISECONDS); + if (isTimeout(maxTimeNs)) { + log.warn("Timeout distributing encryption request for keyId={} collection={}", keyId, collectionName); + if (collectionState == null || State.TIMEOUT.priority > collectionState.priority) { + collectionState = State.TIMEOUT; + } + } + for (Future<State> response : responses) { + State state; + try { + state = response.get(); + } catch (ExecutionException e) { + collectionState = State.ERROR; + break; + } + if (collectionState == null || state.priority > collectionState.priority) { + collectionState = state; + } + } + } catch (InterruptedException e) { + collectionState = State.INTERRUPTED; + } + success = collectionState == null || collectionState.isSuccess(); + } + } finally { + String statusValue = success ? STATUS_SUCCESS : STATUS_FAILURE; + rsp.add(STATUS, statusValue); + rsp.addToLog(STATUS, statusValue); + if (collectionState != null) { + rsp.add(ENCRYPTION_STATE, collectionState.value); + rsp.addToLog(ENCRYPTION_STATE, collectionState.value); + } + if (log.isInfoEnabled()) { + long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); + log.info("Responding encryption distributed state={} success={} for keyId={} collection={} timeMs={}", + (collectionState == null ? null : collectionState.value), success, keyId, collectionName, timeMs); + } + } + } + + private SolrClientHolder getHttpSolrClient(SolrQueryRequest req) { + CoreContainer coreContainer = req.getCore().getCoreContainer(); + CloudSolrClient cloudSolrClient = coreContainer.getSolrClientCache().getCloudSolrClient(coreContainer.getZkController().getZkClient().getZkServerAddress()); + if (cloudSolrClient instanceof CloudHttp2SolrClient) { + return new SolrClientHolder(((CloudHttp2SolrClient) cloudSolrClient).getHttpClient(), false); + } + return new SolrClientHolder(new Http2SolrClient.Builder().build(), true); + } + + protected ModifiableSolrParams createDistributedRequestParams(SolrQueryRequest req, SolrQueryResponse rsp, String keyId) { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(PARAM_KEY_ID, keyId == null ? NO_KEY_ID : keyId); + return params; + } + + boolean isTimeout(long maxTimeNs) { + return System.nanoTime() > maxTimeNs; + } + + private State sendEncryptionRequestWithRetry(Replica replica, ModifiableSolrParams params, Http2SolrClient httpSolrClient, String keyId, String collection) { + for (int numAttempts = 0; numAttempts < DISTRIBUTION_MAX_ATTEMPTS; numAttempts++) { + try { + NamedList<Object> response = sendEncryptionRequest(replica, params, httpSolrClient); + Object responseStatus = response.get(STATUS); + Object responseState = response.get(ENCRYPTION_STATE); + log.info("Encryption state {} status {} for replica {} keyId {} collection={}", responseStatus, responseState, replica.getName(), keyId, collection); + if (responseState != null) { + return State.fromValue(responseState.toString()); + } + } catch (SolrServerException | IOException e) { + log.error("Error occurred while sending encryption request", e); + } + } + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed encryption request to replica " + replica.getName() + " for keyId " + keyId + " collection " + collection); + } + + private NamedList<Object> sendEncryptionRequest(Replica replica, ModifiableSolrParams params, Http2SolrClient httpSolrClient) + throws SolrServerException, IOException { + GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.POST, getPath(), params); + request.setBasePath(replica.getCoreUrl()); + return httpSolrClient.request(request); + } + + protected String getPath() { + return "/admin/encrypt"; Review Comment: instead of hard-coding this, should probably use the path of the current request (as we are calling ourselves), which is request.getPath() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For additional commands, e-mail: issues-h...@solr.apache.org