dsmiley commented on a change in pull request #1770:
URL: https://github.com/apache/lucene-solr/pull/1770#discussion_r480417292



##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
##########
@@ -537,40 +541,100 @@ public void registerDocCollectionWatcher(String 
collection, DocCollectionWatcher
       }
     }
 
-    final NamedList<Throwable> exceptions = new NamedList<>();
-    @SuppressWarnings({"rawtypes"})
-    final NamedList<NamedList> shardResponses = new 
NamedList<>(routes.size()+1); // +1 for deleteQuery
+    final NamedList<NamedList<?>> shardResponses = new 
NamedList<>(routes.size()+1); // +1 for deleteQuery
 
     long start = System.nanoTime();
 
+    CompletableFuture<Void> updateFuture;
     if (parallelUpdates) {
-      final Map<String, Future<NamedList<?>>> responseFutures = new 
HashMap<>(routes.size());
-      for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
-        final String url = entry.getKey();
-        final LBSolrClient.Req lbRequest = entry.getValue();
-        try {
-          MDC.put("CloudSolrClient.url", url);
-          responseFutures.put(url, threadPool.submit(() -> {
-            return getLbClient().request(lbRequest).getResponse();
-          }));
-        } finally {
-          MDC.remove("CloudSolrClient.url");
+      updateFuture = doUpdatesWithExecutor(routes, shardResponses, 
isAsyncRequest);
+    } else {
+      updateFuture = doUpdatesWithoutExecutor(routes, shardResponses, 
isAsyncRequest);
+    }
+
+    CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>();
+    if (!isAsyncRequest) {
+      try {
+        updateFuture.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof SolrServerException) {
+          throw (SolrServerException) cause;
+        } else if (cause instanceof RuntimeException) {
+          throw (RuntimeException) cause;
+        } else {
+          throw new SolrServerException(cause);
         }
       }
+      doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, 
apiFuture, start, isAsyncRequest);
+    } else {
+      updateFuture.whenComplete((result, error) -> {
+        if (updateFuture.isCompletedExceptionally()) {

Review comment:
       Minor point: I think it reads nicer to have the non-exceptional code 
first, and the exceptional path last.

##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
##########
@@ -862,15 +1015,39 @@ public RouteException(ErrorCode errorCode, 
NamedList<Throwable> throwables, Map<
     }
     List<String> inputCollections =
         collection == null ? Collections.emptyList() : 
StrUtils.splitSmart(collection, ",", true);
-    return requestWithRetryOnStaleState(request, 0, inputCollections);
+
+    CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>();
+    final AtomicBoolean cancelled = new AtomicBoolean(false);
+    final AtomicReference<CompletableFuture<NamedList<Object>>> currentFuture 
= new AtomicReference<>();
+    apiFuture.exceptionally((error) -> {
+      if (apiFuture.isCancelled()) {
+        synchronized (cancelled) {

Review comment:
       This code is similar to the LB class where I provided feedback.  The 
same feedback may apply here -- I don't see the point of synchronization on 
cancelled or why cancelled exists if we have a CF which can hold that cancelled 
state.

##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
##########
@@ -853,6 +993,19 @@ public RouteException(ErrorCode errorCode, 
NamedList<Throwable> throwables, Map<
 
   @Override
   public NamedList<Object> request(@SuppressWarnings({"rawtypes"})SolrRequest 
request, String collection) throws SolrServerException, IOException {
+      // synchronous requests should return an already completed future
+      return getNowOrException(makeRequest(request, collection, false));
+  }
+
+  /**
+   * Makes a request either synchronously or asynchronously depending on the 
isAsyncRequest parameter. The returned
+   * CompletableFuture will already be completed in the case of a sync request.
+   *
+   * For async requests, the internal LB Solr client given by getLbClient() 
must be an LBHttp2SolrClient.
+   */
+  CompletableFuture<NamedList<Object>> makeRequest(SolrRequest<?> request,

Review comment:
       Define as protected?

##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
##########
@@ -862,15 +1015,39 @@ public RouteException(ErrorCode errorCode, 
NamedList<Throwable> throwables, Map<
     }
     List<String> inputCollections =
         collection == null ? Collections.emptyList() : 
StrUtils.splitSmart(collection, ",", true);
-    return requestWithRetryOnStaleState(request, 0, inputCollections);
+
+    CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>();
+    final AtomicBoolean cancelled = new AtomicBoolean(false);
+    final AtomicReference<CompletableFuture<NamedList<Object>>> currentFuture 
= new AtomicReference<>();
+    apiFuture.exceptionally((error) -> {
+      if (apiFuture.isCancelled()) {
+        synchronized (cancelled) {

Review comment:
       I'll at-mention @CaoManhDat again whom may know as I think he authored 
this mechanism.

##########
File path: solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java
##########
@@ -246,6 +248,42 @@ public final T process(SolrClient client) throws 
SolrServerException, IOExceptio
     return process(client, null);
   }
 
+  /**
+   * Send this request to a {@link SolrClient} asynchronously
+   *
+   * @param client the SolrClient to communicate with
+   * @param collection the collection to execute the request against
+   *
+   * @return a {@link CompletableFuture} that tracks the progress of the async 
request.
+   * Once completed, the CompletableFuture will contain the response.
+   */
+  public final CompletableFuture<T> processAsynchronously(SolrClient client, 
String collection) {
+    final long startNanos = System.nanoTime();
+    final CompletableFuture<NamedList<Object>> internalFuture = 
client.requestAsync(this, collection);
+    final CompletableFuture<T> apiFuture = new CompletableFuture<>();
+
+    internalFuture.whenComplete((result, error) -> {

Review comment:
       I think this can be adapted to use `internalFuture.thenApply(...)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to