dsmiley commented on code in PR #115:
URL: https://github.com/apache/solr-sandbox/pull/115#discussion_r1904965192


##########
encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java:
##########
@@ -260,12 +330,116 @@ public void handleRequestBody(SolrQueryRequest req, 
SolrQueryResponse rsp) throw
       } else {
         rsp.add(STATUS, STATUS_FAILURE);
       }
-      log.info("Responding encryption state={} success={} for keyId={}",
-               encryptionState, success, keyId);
-      rsp.add(ENCRYPTION_STATE, encryptionState);
+      rsp.add(ENCRYPTION_STATE, state.value);
+      long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs);
+      log.info("Responding encryption state={} success={} for keyId={} 
timeMs={}",
+          state.value, success, keyId, timeMs);
     }
   }
 
+  private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, 
String keyId, long startTimeNs) {
+    boolean success = false;
+    String collectionName = null;
+    State collectionState = null;
+    long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0);
+    long maxTimeNs = timeAllowedMs <= 0 ? Long.MAX_VALUE : startTimeNs + 
timeAllowedMs;
+    try {
+      collectionName = req.getCore().getCoreDescriptor().getCollectionName();
+      if (collectionName == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Parameter " + DISTRIB + " can only be used in Solr Cloud mode.");
+      }
+      log.debug("Encrypt request distributed for keyId={} collection={}", 
keyId, collectionName);
+      DocCollection docCollection = 
req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName);
+      if (docCollection == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Parameter " + DISTRIB + " present but collection '" + collectionName + "' not 
found.");
+      }
+      try (SolrClientHolder solrClient = getHttpSolrClient(req)) {
+        ModifiableSolrParams params = createDistributedRequestParams(req, rsp, 
keyId);
+        for (Slice slice : docCollection.getActiveSlices()) {

Review Comment:
   Hmm; sends in series vs in parallel.  We could chat about that tomorrow.



##########
encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java:
##########
@@ -194,15 +258,22 @@ public void handleRequestBody(SolrQueryRequest req, 
SolrQueryResponse rsp) throw
     } else if (keyId.equals(NO_KEY_ID)) {
       keyId = null;
     }
+    // Check the defined DirectoryFactory instance.
     EncryptionDirectoryFactory.getFactory(req.getCore());
+
+    if (req.getParams().getBool(DISTRIB, false)) {

Review Comment:
   It'd be nice if we could default "distrib" based on whether the request was 
addressed to the collection or not.  Maybe that metadata is there?



##########
encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java:
##########
@@ -260,12 +330,116 @@ public void handleRequestBody(SolrQueryRequest req, 
SolrQueryResponse rsp) throw
       } else {
         rsp.add(STATUS, STATUS_FAILURE);
       }
-      log.info("Responding encryption state={} success={} for keyId={}",
-               encryptionState, success, keyId);
-      rsp.add(ENCRYPTION_STATE, encryptionState);
+      rsp.add(ENCRYPTION_STATE, state.value);
+      long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs);
+      log.info("Responding encryption state={} success={} for keyId={} 
timeMs={}",
+          state.value, success, keyId, timeMs);
     }
   }
 
+  private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, 
String keyId, long startTimeNs) {
+    boolean success = false;
+    String collectionName = null;
+    State collectionState = null;
+    long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0);
+    long maxTimeNs = timeAllowedMs <= 0 ? Long.MAX_VALUE : startTimeNs + 
timeAllowedMs;
+    try {
+      collectionName = req.getCore().getCoreDescriptor().getCollectionName();
+      if (collectionName == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Parameter " + DISTRIB + " can only be used in Solr Cloud mode.");
+      }
+      log.debug("Encrypt request distributed for keyId={} collection={}", 
keyId, collectionName);
+      DocCollection docCollection = 
req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName);
+      if (docCollection == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Parameter " + DISTRIB + " present but collection '" + collectionName + "' not 
found.");
+      }
+      try (SolrClientHolder solrClient = getHttpSolrClient(req)) {
+        ModifiableSolrParams params = createDistributedRequestParams(req, rsp, 
keyId);
+        for (Slice slice : docCollection.getActiveSlices()) {
+          if (isTimeout(maxTimeNs)) {
+            log.warn("Timeout distributing encryption request for keyId={} 
collection={}", keyId, collectionName);
+            if (collectionState == null || State.TIMEOUT.priority > 
collectionState.priority) {
+              collectionState = State.TIMEOUT;
+            }
+            break;
+          }
+          Replica replica = slice.getLeader();
+          if (replica == null) {
+            log.error("No leader found for shard {}", slice.getName());
+            collectionState = State.ERROR;
+            continue;
+          }
+          State state = sendEncryptionRequestWithRetry(replica, params, 
solrClient.getClient(), keyId, collectionName);
+          if (collectionState == null || state.priority > 
collectionState.priority) {
+            collectionState = state;
+          }
+        }
+        success = collectionState == null || collectionState.isSuccess();
+      }
+    } finally {
+      if (success) {
+        rsp.add(STATUS, STATUS_SUCCESS);
+      } else {
+        rsp.add(STATUS, STATUS_FAILURE);
+      }
+      if (collectionState != null) {
+        rsp.add(ENCRYPTION_STATE, collectionState.value);
+      }
+      if (log.isInfoEnabled()) {
+        long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs);
+        log.info("Responding encryption distributed state={} success={} for 
keyId={} collection={} timeMs={}",
+            (collectionState == null ? null : collectionState.value), success, 
keyId, collectionName, timeMs);
+      }
+    }
+  }
+
+  private SolrClientHolder getHttpSolrClient(SolrQueryRequest req) {
+    CoreContainer coreContainer = req.getCore().getCoreContainer();
+    CloudSolrClient cloudSolrClient = 
coreContainer.getSolrClientCache().getCloudSolrClient(coreContainer.getZkController().getZkClient().getZkServerAddress());
+    if (cloudSolrClient instanceof CloudHttp2SolrClient) {
+      return new SolrClientHolder(((CloudHttp2SolrClient) 
cloudSolrClient).getHttpClient(), false);
+    }
+    return new SolrClientHolder(new Http2SolrClient.Builder().build(), true);
+  }
+
+  protected ModifiableSolrParams 
createDistributedRequestParams(SolrQueryRequest req, SolrQueryResponse rsp, 
String keyId) {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(PARAM_KEY_ID, keyId == null ? NO_KEY_ID : keyId);
+    return params;
+  }
+
+  boolean isTimeout(long maxTimeNs) {
+    return System.nanoTime() > maxTimeNs;
+  }
+
+  private State sendEncryptionRequestWithRetry(Replica replica, 
ModifiableSolrParams params, Http2SolrClient httpSolrClient, String keyId, 
String collection) {
+    for (int numAttempts = 0; numAttempts < DISTRIBUTION_MAX_ATTEMPTS; 
numAttempts++) {
+      try {
+        NamedList<Object> response = sendEncryptionRequest(replica, params, 
httpSolrClient);
+        Object responseStatus = response.get(STATUS);
+        Object responseState = response.get(ENCRYPTION_STATE);
+        log.info("Encryption state {} status {} for replica {} keyId {} 
collection={}", responseStatus, responseState, replica.getName(), keyId, 
collection);
+        if (responseState != null) {
+          return State.fromValue(responseState.toString());
+        }
+      } catch (SolrServerException | IOException e) {
+        log.error("Error occurred while sending encryption request", e);
+      }
+    }
+    throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed 
encryption request to replica " + replica.getName() + " for keyId " + keyId + " 
collection " + collection);
+  }
+
+  private NamedList<Object> sendEncryptionRequest(Replica replica, 
ModifiableSolrParams params, Http2SolrClient httpSolrClient)
+      throws SolrServerException, IOException {
+    GenericSolrRequest request = new 
GenericSolrRequest(SolrRequest.METHOD.POST, getPath(), params);
+    request.setBasePath(replica.getCoreUrl());

Review Comment:
   setBasePath is now gone in Solr main.  Instead, you should use 
`httpSolrClient.requestWithBaseUrl`



##########
encryption/src/test/java/org/apache/solr/encryption/EncryptionTestUtil.java:
##########
@@ -130,13 +143,29 @@ public EncryptionStatus encrypt(String keyId) throws 
Exception {
     params.set(PARAM_KEY_ID, keyId);
     params.set(PARAM_TENANT_ID, TENANT_ID);
     params.set(PARAM_ENCRYPTION_KEY_BLOB, generateKeyBlob(keyId));
+    if (shouldDistributeEncryptRequest()) {
+      return encryptDistrib(params);
+    }
     GenericSolrRequest encryptRequest = new 
GenericSolrRequest(SolrRequest.METHOD.GET, "/admin/encrypt", params);
     EncryptionStatus encryptionStatus = new EncryptionStatus();
     forAllReplicas(replica -> {
       NamedList<Object> response = requestCore(encryptRequest, replica);
+      EncryptionRequestHandler.State state = 
EncryptionRequestHandler.State.fromValue(response.get(ENCRYPTION_STATE).toString());
       encryptionStatus.success &= response.get(STATUS).equals(STATUS_SUCCESS);
-      encryptionStatus.complete &= 
response.get(ENCRYPTION_STATE).equals(STATE_COMPLETE);
-    });
+      encryptionStatus.complete &= state == 
EncryptionRequestHandler.State.COMPLETE;
+    }, false);
+    return encryptionStatus;
+  }
+
+  private EncryptionStatus encryptDistrib(ModifiableSolrParams params) throws 
SolrServerException, IOException {
+    params.set(DISTRIB, "true");

Review Comment:
   icky to manipulate the input arg but I can let it slide.  I wish there was a 
Solr one-liner to wrap SolrParams with a default.  SolrParams.wrapDefaults is a 
technique.



##########
encryption/src/test/java/org/apache/solr/encryption/EncryptionTestUtil.java:
##########
@@ -220,10 +249,14 @@ public void assertCannotReloadCores() throws Exception {
   }
 
   /** Processes the given {@code action} for all replicas of the collection 
defined in the constructor. */
-  public void forAllReplicas(Consumer<Replica> action) {
+  public void forAllReplicas(Consumer<Replica> action, boolean onlyLeaders) {

Review Comment:
   I think the functional arg should be last as a matter of taste



##########
encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java:
##########
@@ -260,12 +330,116 @@ public void handleRequestBody(SolrQueryRequest req, 
SolrQueryResponse rsp) throw
       } else {
         rsp.add(STATUS, STATUS_FAILURE);
       }
-      log.info("Responding encryption state={} success={} for keyId={}",
-               encryptionState, success, keyId);
-      rsp.add(ENCRYPTION_STATE, encryptionState);
+      rsp.add(ENCRYPTION_STATE, state.value);
+      long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs);
+      log.info("Responding encryption state={} success={} for keyId={} 
timeMs={}",
+          state.value, success, keyId, timeMs);
     }
   }
 
+  private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, 
String keyId, long startTimeNs) {
+    boolean success = false;
+    String collectionName = null;
+    State collectionState = null;
+    long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0);
+    long maxTimeNs = timeAllowedMs <= 0 ? Long.MAX_VALUE : startTimeNs + 
timeAllowedMs;
+    try {
+      collectionName = req.getCore().getCoreDescriptor().getCollectionName();
+      if (collectionName == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Parameter " + DISTRIB + " can only be used in Solr Cloud mode.");
+      }
+      log.debug("Encrypt request distributed for keyId={} collection={}", 
keyId, collectionName);
+      DocCollection docCollection = 
req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName);
+      if (docCollection == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Parameter " + DISTRIB + " present but collection '" + collectionName + "' not 
found.");
+      }
+      try (SolrClientHolder solrClient = getHttpSolrClient(req)) {
+        ModifiableSolrParams params = createDistributedRequestParams(req, rsp, 
keyId);
+        for (Slice slice : docCollection.getActiveSlices()) {
+          if (isTimeout(maxTimeNs)) {
+            log.warn("Timeout distributing encryption request for keyId={} 
collection={}", keyId, collectionName);
+            if (collectionState == null || State.TIMEOUT.priority > 
collectionState.priority) {
+              collectionState = State.TIMEOUT;
+            }
+            break;
+          }
+          Replica replica = slice.getLeader();
+          if (replica == null) {
+            log.error("No leader found for shard {}", slice.getName());
+            collectionState = State.ERROR;
+            continue;
+          }
+          State state = sendEncryptionRequestWithRetry(replica, params, 
solrClient.getClient(), keyId, collectionName);
+          if (collectionState == null || state.priority > 
collectionState.priority) {
+            collectionState = state;
+          }
+        }
+        success = collectionState == null || collectionState.isSuccess();
+      }
+    } finally {
+      if (success) {
+        rsp.add(STATUS, STATUS_SUCCESS);
+      } else {
+        rsp.add(STATUS, STATUS_FAILURE);
+      }
+      if (collectionState != null) {
+        rsp.add(ENCRYPTION_STATE, collectionState.value);
+      }
+      if (log.isInfoEnabled()) {
+        long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs);
+        log.info("Responding encryption distributed state={} success={} for 
keyId={} collection={} timeMs={}",
+            (collectionState == null ? null : collectionState.value), success, 
keyId, collectionName, timeMs);
+      }
+    }
+  }
+
+  private SolrClientHolder getHttpSolrClient(SolrQueryRequest req) {

Review Comment:
   Why not just use 
`org.apache.solr.core.CoreContainer#getDefaultHttpSolrClient`?  This is very 
new BTW.



##########
encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java:
##########
@@ -260,12 +330,116 @@ public void handleRequestBody(SolrQueryRequest req, 
SolrQueryResponse rsp) throw
       } else {
         rsp.add(STATUS, STATUS_FAILURE);
       }
-      log.info("Responding encryption state={} success={} for keyId={}",
-               encryptionState, success, keyId);
-      rsp.add(ENCRYPTION_STATE, encryptionState);
+      rsp.add(ENCRYPTION_STATE, state.value);
+      long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs);
+      log.info("Responding encryption state={} success={} for keyId={} 
timeMs={}",
+          state.value, success, keyId, timeMs);
     }
   }
 
+  private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, 
String keyId, long startTimeNs) {
+    boolean success = false;
+    String collectionName = null;
+    State collectionState = null;
+    long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0);
+    long maxTimeNs = timeAllowedMs <= 0 ? Long.MAX_VALUE : startTimeNs + 
timeAllowedMs;
+    try {
+      collectionName = req.getCore().getCoreDescriptor().getCollectionName();
+      if (collectionName == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Parameter " + DISTRIB + " can only be used in Solr Cloud mode.");
+      }
+      log.debug("Encrypt request distributed for keyId={} collection={}", 
keyId, collectionName);
+      DocCollection docCollection = 
req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName);
+      if (docCollection == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Parameter " + DISTRIB + " present but collection '" + collectionName + "' not 
found.");
+      }
+      try (SolrClientHolder solrClient = getHttpSolrClient(req)) {
+        ModifiableSolrParams params = createDistributedRequestParams(req, rsp, 
keyId);
+        for (Slice slice : docCollection.getActiveSlices()) {
+          if (isTimeout(maxTimeNs)) {
+            log.warn("Timeout distributing encryption request for keyId={} 
collection={}", keyId, collectionName);
+            if (collectionState == null || State.TIMEOUT.priority > 
collectionState.priority) {
+              collectionState = State.TIMEOUT;
+            }
+            break;
+          }
+          Replica replica = slice.getLeader();
+          if (replica == null) {
+            log.error("No leader found for shard {}", slice.getName());
+            collectionState = State.ERROR;
+            continue;
+          }
+          State state = sendEncryptionRequestWithRetry(replica, params, 
solrClient.getClient(), keyId, collectionName);
+          if (collectionState == null || state.priority > 
collectionState.priority) {
+            collectionState = state;
+          }
+        }
+        success = collectionState == null || collectionState.isSuccess();
+      }
+    } finally {
+      if (success) {
+        rsp.add(STATUS, STATUS_SUCCESS);
+      } else {
+        rsp.add(STATUS, STATUS_FAILURE);
+      }
+      if (collectionState != null) {
+        rsp.add(ENCRYPTION_STATE, collectionState.value);
+      }
+      if (log.isInfoEnabled()) {
+        long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs);
+        log.info("Responding encryption distributed state={} success={} for 
keyId={} collection={} timeMs={}",

Review Comment:
   FYI collection is redundant with MDC.
   And you might want to put state and/or success into `rsp.addToLog` where it 
may be seen in structured logs



##########
encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java:
##########
@@ -260,12 +330,116 @@ public void handleRequestBody(SolrQueryRequest req, 
SolrQueryResponse rsp) throw
       } else {
         rsp.add(STATUS, STATUS_FAILURE);
       }
-      log.info("Responding encryption state={} success={} for keyId={}",
-               encryptionState, success, keyId);
-      rsp.add(ENCRYPTION_STATE, encryptionState);
+      rsp.add(ENCRYPTION_STATE, state.value);
+      long timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs);
+      log.info("Responding encryption state={} success={} for keyId={} 
timeMs={}",
+          state.value, success, keyId, timeMs);
     }
   }
 
+  private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, 
String keyId, long startTimeNs) {
+    boolean success = false;
+    String collectionName = null;
+    State collectionState = null;
+    long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0);
+    long maxTimeNs = timeAllowedMs <= 0 ? Long.MAX_VALUE : startTimeNs + 
timeAllowedMs;
+    try {
+      collectionName = req.getCore().getCoreDescriptor().getCollectionName();
+      if (collectionName == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Parameter " + DISTRIB + " can only be used in Solr Cloud mode.");
+      }
+      log.debug("Encrypt request distributed for keyId={} collection={}", 
keyId, collectionName);
+      DocCollection docCollection = 
req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName);
+      if (docCollection == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Parameter " + DISTRIB + " present but collection '" + collectionName + "' not 
found.");
+      }
+      try (SolrClientHolder solrClient = getHttpSolrClient(req)) {
+        ModifiableSolrParams params = createDistributedRequestParams(req, rsp, 
keyId);
+        for (Slice slice : docCollection.getActiveSlices()) {
+          if (isTimeout(maxTimeNs)) {
+            log.warn("Timeout distributing encryption request for keyId={} 
collection={}", keyId, collectionName);
+            if (collectionState == null || State.TIMEOUT.priority > 
collectionState.priority) {
+              collectionState = State.TIMEOUT;
+            }
+            break;
+          }
+          Replica replica = slice.getLeader();
+          if (replica == null) {
+            log.error("No leader found for shard {}", slice.getName());
+            collectionState = State.ERROR;
+            continue;
+          }
+          State state = sendEncryptionRequestWithRetry(replica, params, 
solrClient.getClient(), keyId, collectionName);
+          if (collectionState == null || state.priority > 
collectionState.priority) {
+            collectionState = state;
+          }
+        }
+        success = collectionState == null || collectionState.isSuccess();
+      }
+    } finally {
+      if (success) {
+        rsp.add(STATUS, STATUS_SUCCESS);
+      } else {
+        rsp.add(STATUS, STATUS_FAILURE);
+      }
+      if (collectionState != null) {
+        rsp.add(ENCRYPTION_STATE, collectionState.value);
+      }
+      if (log.isInfoEnabled()) {

Review Comment:
   BTW you needn't ever check for isInfoEnabled.  It's often needed, however, 
for debug or trace level but even then not always.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org
For additional commands, e-mail: issues-h...@solr.apache.org

Reply via email to