bruno-roustant commented on code in PR #115: URL: https://github.com/apache/solr-sandbox/pull/115#discussion_r1944888376
########## encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java: ########## @@ -260,12 +339,133 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw } else { rsp.add(STATUS, STATUS_FAILURE); } - log.info("Responding encryption state={} success={} for keyId={}", - encryptionState, success, keyId); - rsp.add(ENCRYPTION_STATE, encryptionState); + rsp.add(ENCRYPTION_STATE, state.value); + long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); + log.info("Responding encryption state={} success={} for keyId={} timeMs={}", + state.value, success, keyId, timeMs); } } + private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, String keyId, long startTimeNs) { + boolean success = false; + String collectionName = null; + State collectionState = null; + long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0); + long maxTimeNs = timeAllowedMs <= 0 ? Long.MAX_VALUE : startTimeNs + timeAllowedMs; + try { + collectionName = req.getCore().getCoreDescriptor().getCollectionName(); + if (collectionName == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " can only be used in Solr Cloud mode."); + } + log.debug("Encrypt request distributed for keyId={} collection={}", keyId, collectionName); + DocCollection docCollection = req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName); + if (docCollection == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " present but collection '" + collectionName + "' not found."); + } + try (SolrClientHolder solrClient = getHttpSolrClient(req)) { + ModifiableSolrParams params = createDistributedRequestParams(req, rsp, keyId); + Collection<Slice> slices = docCollection.getActiveSlices(); + Collection<Callable<State>> encryptRequests = new ArrayList<>(slices.size()); + final String collectionNameFinal = collectionName; + for (Slice slice : slices) { + Replica replica = slice.getLeader(); + if (replica == null) { + log.error("No leader found for shard {}", slice.getName()); + collectionState = State.ERROR; + continue; + } + encryptRequests.add(() -> sendEncryptionRequestWithRetry(replica, params, solrClient.getClient(), keyId, collectionNameFinal)); + } + try { + List<Future<State>> responses = timeAllowedMs <= 0 ? + executor.invokeAll(encryptRequests) + : executor.invokeAll(encryptRequests, timeAllowedMs, TimeUnit.MILLISECONDS); + if (isTimeout(maxTimeNs)) { + log.warn("Timeout distributing encryption request for keyId={} collection={}", keyId, collectionName); + if (collectionState == null || State.TIMEOUT.priority > collectionState.priority) { + collectionState = State.TIMEOUT; + } + } + for (Future<State> response : responses) { + State state; + try { + state = response.get(); + } catch (ExecutionException e) { + collectionState = State.ERROR; + break; + } + if (collectionState == null || state.priority > collectionState.priority) { + collectionState = state; + } + } + } catch (InterruptedException e) { + collectionState = State.INTERRUPTED; + } + success = collectionState == null || collectionState.isSuccess(); + } + } finally { + String statusValue = success ? STATUS_SUCCESS : STATUS_FAILURE; + rsp.add(STATUS, statusValue); + rsp.addToLog(STATUS, statusValue); + if (collectionState != null) { + rsp.add(ENCRYPTION_STATE, collectionState.value); + rsp.addToLog(ENCRYPTION_STATE, collectionState.value); + } + if (log.isInfoEnabled()) { + long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); + log.info("Responding encryption distributed state={} success={} for keyId={} collection={} timeMs={}", + (collectionState == null ? null : collectionState.value), success, keyId, collectionName, timeMs); + } + } + } + + private SolrClientHolder getHttpSolrClient(SolrQueryRequest req) { + CoreContainer coreContainer = req.getCore().getCoreContainer(); + CloudSolrClient cloudSolrClient = coreContainer.getSolrClientCache().getCloudSolrClient(coreContainer.getZkController().getZkClient().getZkServerAddress()); + if (cloudSolrClient instanceof CloudHttp2SolrClient) { + return new SolrClientHolder(((CloudHttp2SolrClient) cloudSolrClient).getHttpClient(), false); + } + return new SolrClientHolder(new Http2SolrClient.Builder().build(), true); + } + + protected ModifiableSolrParams createDistributedRequestParams(SolrQueryRequest req, SolrQueryResponse rsp, String keyId) { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(PARAM_KEY_ID, keyId == null ? NO_KEY_ID : keyId); + return params; + } + + boolean isTimeout(long maxTimeNs) { Review Comment: Following your advice, I replaced the code by using TimeOut. Doing so I discovered a long overflow bug in TimeOut if we try to create a TimeOut with interval = Long.MAX_VALUE, or any large interval which overflows when added to TimeSource.getTimeNs(). I'll create an issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For additional commands, e-mail: issues-h...@solr.apache.org