dsmiley commented on a change in pull request #1770:
URL: https://github.com/apache/lucene-solr/pull/1770#discussion_r479687914



##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
##########
@@ -82,18 +81,19 @@ protected SolrClient getClient(String baseUrl) {
     return httpClient;
   }
 
-  public Cancellable asyncReq(Req req, AsyncListener<Rsp> asyncListener) {
+  @Override
+  public CompletableFuture<Rsp> requestAsync(Req req) {
+    CompletableFuture<Rsp> apiFuture = new CompletableFuture<>();
     Rsp rsp = new Rsp();
     boolean isNonRetryable = req.request instanceof IsUpdateRequest || 
ADMIN_PATHS.contains(req.request.getPath());
     ServerIterator it = new ServerIterator(req, zombieServers);
-    asyncListener.onStart();
     final AtomicBoolean cancelled = new AtomicBoolean(false);

Review comment:
       It seems to me that the changes you've done allow us to remove the 
AtomicBoolean cancelled (with synchronization of it), simplifying the code.  If 
we want to know if the request has been cancelled, we can simply check 
apiFuture.isCancelled.  Right?
   @CaoManhDat was/is the synchronization necessary?

##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
##########
@@ -580,23 +644,71 @@ public void registerDocCollectionWatcher(String 
collection, DocCollectionWatcher
           throw getRouteException(SolrException.ErrorCode.SERVER_ERROR, 
exceptions, routes);
         }
       }
-    } else {
-      for (Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
-        String url = entry.getKey();
-        LBSolrClient.Req lbRequest = entry.getValue();
+      return null;
+    });
+    updateFuture.exceptionally((error) -> {
+      if (updateFuture.isCancelled()) {
+        for (CompletableFuture<NamedList<Object>> cf : futuresArray) {
+          cf.cancel(true);
+        }
+      }
+      return null;
+    });
+    return updateFuture;
+  }
+
+  private CompletableFuture<Void> doUpdatesWithoutExecutor(final Map<String, ? 
extends LBSolrClient.Req> routes,

Review comment:
       Since the asyc & sync code paths are rather different inside here, I 
think you should structure this method as a big if-else.  The only part in 
common is like 2 lines.

##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
##########
@@ -112,95 +112,106 @@ public void onFailure(Exception e, boolean retryReq) {
               if (cancelled.get()) {
                 return;
               }
-              Cancellable cancellable = doRequest(url, req, rsp, 
isNonRetryable, it.isServingZombieServer(), this);
-              currentCancellable.set(cancellable);
+              CompletableFuture<NamedList<Object>> future = 
doAsyncRequest(url, req, rsp, isNonRetryable, it.isServingZombieServer(), this);
+              currentFuture.set(future);
             }
           } finally {
             MDC.remove("LBSolrClient.url");
           }
         } else {
-          asyncListener.onFailure(e);
+          apiFuture.completeExceptionally(e);
         }
       }
     };
     try {
-      Cancellable cancellable = doRequest(it.nextOrError(), req, rsp, 
isNonRetryable, it.isServingZombieServer(), retryListener);
-      currentCancellable.set(cancellable);
+      CompletableFuture<NamedList<Object>> future = 
doAsyncRequest(it.nextOrError(), req, rsp, isNonRetryable, 
it.isServingZombieServer(), retryListener);
+      currentFuture.set(future);
     } catch (SolrServerException e) {
-      asyncListener.onFailure(e);
+      apiFuture.completeExceptionally(e);
     }
-    return () -> {
-      synchronized (cancelled) {
-        cancelled.set(true);
-        if (currentCancellable.get() != null) {
-          currentCancellable.get().cancel();
+    apiFuture.exceptionally((error) -> {
+      if (apiFuture.isCancelled()) {
+        synchronized (cancelled) {
+          cancelled.set(true);
+          if (currentFuture.get() != null) {

Review comment:
       Let's replace with a get to a local variable, then cancel the local var 
if non-null.
   
   Or... hmmm... seems practically speaking this AtomicReference will never 
have a null value.  If the catch of SolrServerException above returned 
apiFuture right there, then it would be clearer that currentFuture must always 
have a non-null value.

##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
##########
@@ -112,95 +112,106 @@ public void onFailure(Exception e, boolean retryReq) {
               if (cancelled.get()) {
                 return;
               }
-              Cancellable cancellable = doRequest(url, req, rsp, 
isNonRetryable, it.isServingZombieServer(), this);
-              currentCancellable.set(cancellable);
+              CompletableFuture<NamedList<Object>> future = 
doAsyncRequest(url, req, rsp, isNonRetryable, it.isServingZombieServer(), this);
+              currentFuture.set(future);
             }
           } finally {
             MDC.remove("LBSolrClient.url");
           }
         } else {
-          asyncListener.onFailure(e);
+          apiFuture.completeExceptionally(e);
         }
       }
     };
     try {
-      Cancellable cancellable = doRequest(it.nextOrError(), req, rsp, 
isNonRetryable, it.isServingZombieServer(), retryListener);
-      currentCancellable.set(cancellable);
+      CompletableFuture<NamedList<Object>> future = 
doAsyncRequest(it.nextOrError(), req, rsp, isNonRetryable, 
it.isServingZombieServer(), retryListener);
+      currentFuture.set(future);
     } catch (SolrServerException e) {
-      asyncListener.onFailure(e);
+      apiFuture.completeExceptionally(e);
     }
-    return () -> {
-      synchronized (cancelled) {
-        cancelled.set(true);
-        if (currentCancellable.get() != null) {
-          currentCancellable.get().cancel();
+    apiFuture.exceptionally((error) -> {
+      if (apiFuture.isCancelled()) {
+        synchronized (cancelled) {
+          cancelled.set(true);
+          if (currentFuture.get() != null) {
+            currentFuture.get().cancel(true);
+          }
         }
       }
-    };
+      return null;
+    });
+    return apiFuture;
   }
 
   private interface RetryListener {
     void onSuccess(Rsp rsp);
     void onFailure(Exception e, boolean retryReq);
   }
 
-  private Cancellable doRequest(String baseUrl, Req req, Rsp rsp, boolean 
isNonRetryable,
+  private CompletableFuture<NamedList<Object>> doAsyncRequest(String baseUrl, 
Req req, Rsp rsp, boolean isNonRetryable,
                          boolean isZombie, RetryListener listener) {
     rsp.server = baseUrl;
     req.getRequest().setBasePath(baseUrl);
-    return 
((Http2SolrClient)getClient(baseUrl)).asyncRequest(req.getRequest(), null, new 
AsyncListener<>() {
-      @Override
-      public void onSuccess(NamedList<Object> result) {
-        rsp.rsp = result;
+    CompletableFuture<NamedList<Object>> future = 
((Http2SolrClient)getClient(baseUrl)).requestAsync(req.getRequest(), null);

Review comment:
       The cast to Http2SolrClient is not necessary (so says my IDE).  
Furthermore, the getClient method could be modified to have a covariant return 
type of Http2SolrClient to make this clearer.

##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
##########
@@ -537,40 +541,100 @@ public void registerDocCollectionWatcher(String 
collection, DocCollectionWatcher
       }
     }
 
-    final NamedList<Throwable> exceptions = new NamedList<>();
-    @SuppressWarnings({"rawtypes"})
-    final NamedList<NamedList> shardResponses = new 
NamedList<>(routes.size()+1); // +1 for deleteQuery
+    final NamedList<NamedList<?>> shardResponses = new 
NamedList<>(routes.size()+1); // +1 for deleteQuery
 
     long start = System.nanoTime();
 
+    CompletableFuture<Void> updateFuture;
     if (parallelUpdates) {
-      final Map<String, Future<NamedList<?>>> responseFutures = new 
HashMap<>(routes.size());
-      for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
-        final String url = entry.getKey();
-        final LBSolrClient.Req lbRequest = entry.getValue();
-        try {
-          MDC.put("CloudSolrClient.url", url);
-          responseFutures.put(url, threadPool.submit(() -> {
-            return getLbClient().request(lbRequest).getResponse();
-          }));
-        } finally {
-          MDC.remove("CloudSolrClient.url");
+      updateFuture = doUpdatesWithExecutor(routes, shardResponses, 
isAsyncRequest);
+    } else {
+      updateFuture = doUpdatesWithoutExecutor(routes, shardResponses, 
isAsyncRequest);
+    }
+
+    CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>();
+    if (!isAsyncRequest) {
+      try {
+        updateFuture.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof SolrServerException) {
+          throw (SolrServerException) cause;
+        } else if (cause instanceof RuntimeException) {
+          throw (RuntimeException) cause;
+        } else {
+          throw new SolrServerException(cause);
         }
       }
+      doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, 
apiFuture, start, isAsyncRequest);
+    } else {
+      updateFuture.whenComplete((result, error) -> {
+        if (updateFuture.isCompletedExceptionally()) {
+          apiFuture.completeExceptionally(error);
+        } else {
+          doDeleteQuery(updateRequest, nonRoutableParams, routes, 
shardResponses, apiFuture, start, isAsyncRequest);
+        }
+      });
 
-      for (final Map.Entry<String, Future<NamedList<?>>> entry: 
responseFutures.entrySet()) {
-        final String url = entry.getKey();
-        final Future<NamedList<?>> responseFuture = entry.getValue();
-        try {
-          shardResponses.add(url, responseFuture.get());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          exceptions.add(url, e.getCause());
+      apiFuture.exceptionally((error) -> {
+        if (apiFuture.isCancelled()) {
+          updateFuture.cancel(true);
         }
+        return null;
+      });
+    }
+
+    return apiFuture;
+  }
+
+  private CompletableFuture<Void> doUpdatesWithExecutor(final Map<String, ? 
extends LBSolrClient.Req> routes,
+                                                        
NamedList<NamedList<?>> shardResponses,
+                                                        boolean 
isAsyncRequest) {
+    final NamedList<Throwable> exceptions = new NamedList<>();
+    final Map<String, CompletableFuture<NamedList<Object>>> responseFutures = 
new HashMap<>(routes.size());
+    for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
+      final String url = entry.getKey();
+      final LBSolrClient.Req lbRequest = entry.getValue();
+      try {
+        MDC.put("CloudSolrClient.url", url);
+        final CompletableFuture<NamedList<Object>> future = new 
CompletableFuture<>();
+        if (isAsyncRequest) {
+          CompletableFuture<LBSolrClient.Rsp> reqFuture = 
getLbClient().requestAsync(lbRequest);
+          reqFuture.whenComplete((result, error) -> {

Review comment:
       This can be replaced with:
   `future = reqFuture.thenApply(LBSolrClient.Rsp::getResponse);`

##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
##########
@@ -537,40 +541,100 @@ public void registerDocCollectionWatcher(String 
collection, DocCollectionWatcher
       }
     }
 
-    final NamedList<Throwable> exceptions = new NamedList<>();
-    @SuppressWarnings({"rawtypes"})
-    final NamedList<NamedList> shardResponses = new 
NamedList<>(routes.size()+1); // +1 for deleteQuery
+    final NamedList<NamedList<?>> shardResponses = new 
NamedList<>(routes.size()+1); // +1 for deleteQuery
 
     long start = System.nanoTime();
 
+    CompletableFuture<Void> updateFuture;
     if (parallelUpdates) {
-      final Map<String, Future<NamedList<?>>> responseFutures = new 
HashMap<>(routes.size());
-      for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
-        final String url = entry.getKey();
-        final LBSolrClient.Req lbRequest = entry.getValue();
-        try {
-          MDC.put("CloudSolrClient.url", url);
-          responseFutures.put(url, threadPool.submit(() -> {
-            return getLbClient().request(lbRequest).getResponse();
-          }));
-        } finally {
-          MDC.remove("CloudSolrClient.url");
+      updateFuture = doUpdatesWithExecutor(routes, shardResponses, 
isAsyncRequest);
+    } else {
+      updateFuture = doUpdatesWithoutExecutor(routes, shardResponses, 
isAsyncRequest);
+    }
+
+    CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>();
+    if (!isAsyncRequest) {
+      try {
+        updateFuture.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof SolrServerException) {
+          throw (SolrServerException) cause;
+        } else if (cause instanceof RuntimeException) {
+          throw (RuntimeException) cause;
+        } else {
+          throw new SolrServerException(cause);
         }
       }
+      doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, 
apiFuture, start, isAsyncRequest);
+    } else {
+      updateFuture.whenComplete((result, error) -> {
+        if (updateFuture.isCompletedExceptionally()) {
+          apiFuture.completeExceptionally(error);
+        } else {
+          doDeleteQuery(updateRequest, nonRoutableParams, routes, 
shardResponses, apiFuture, start, isAsyncRequest);
+        }
+      });
 
-      for (final Map.Entry<String, Future<NamedList<?>>> entry: 
responseFutures.entrySet()) {
-        final String url = entry.getKey();
-        final Future<NamedList<?>> responseFuture = entry.getValue();
-        try {
-          shardResponses.add(url, responseFuture.get());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          exceptions.add(url, e.getCause());
+      apiFuture.exceptionally((error) -> {
+        if (apiFuture.isCancelled()) {
+          updateFuture.cancel(true);
         }
+        return null;
+      });
+    }
+
+    return apiFuture;
+  }
+
+  private CompletableFuture<Void> doUpdatesWithExecutor(final Map<String, ? 
extends LBSolrClient.Req> routes,
+                                                        
NamedList<NamedList<?>> shardResponses,
+                                                        boolean 
isAsyncRequest) {
+    final NamedList<Throwable> exceptions = new NamedList<>();
+    final Map<String, CompletableFuture<NamedList<Object>>> responseFutures = 
new HashMap<>(routes.size());
+    for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
+      final String url = entry.getKey();
+      final LBSolrClient.Req lbRequest = entry.getValue();
+      try {
+        MDC.put("CloudSolrClient.url", url);
+        final CompletableFuture<NamedList<Object>> future = new 
CompletableFuture<>();
+        if (isAsyncRequest) {
+          CompletableFuture<LBSolrClient.Rsp> reqFuture = 
getLbClient().requestAsync(lbRequest);
+          reqFuture.whenComplete((result, error) -> {
+            if (!reqFuture.isCompletedExceptionally()) {
+              future.complete(result.getResponse());
+            } else {
+              future.completeExceptionally(error);
+            }
+          });
+        } else {

Review comment:
       Once we can guarantee requestAsync will always work, I think we should 
always do parallel updates with an async code path, thus allowing us to remove 
some of this code.  A step in that direction would be to add an Executor 
argument to LbSolrClient.requestAsync and do a trivial default implementation 
using it.

##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
##########
@@ -537,40 +541,100 @@ public void registerDocCollectionWatcher(String 
collection, DocCollectionWatcher
       }
     }
 
-    final NamedList<Throwable> exceptions = new NamedList<>();
-    @SuppressWarnings({"rawtypes"})
-    final NamedList<NamedList> shardResponses = new 
NamedList<>(routes.size()+1); // +1 for deleteQuery
+    final NamedList<NamedList<?>> shardResponses = new 
NamedList<>(routes.size()+1); // +1 for deleteQuery
 
     long start = System.nanoTime();
 
+    CompletableFuture<Void> updateFuture;
     if (parallelUpdates) {
-      final Map<String, Future<NamedList<?>>> responseFutures = new 
HashMap<>(routes.size());
-      for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
-        final String url = entry.getKey();
-        final LBSolrClient.Req lbRequest = entry.getValue();
-        try {
-          MDC.put("CloudSolrClient.url", url);
-          responseFutures.put(url, threadPool.submit(() -> {
-            return getLbClient().request(lbRequest).getResponse();
-          }));
-        } finally {
-          MDC.remove("CloudSolrClient.url");
+      updateFuture = doUpdatesWithExecutor(routes, shardResponses, 
isAsyncRequest);
+    } else {
+      updateFuture = doUpdatesWithoutExecutor(routes, shardResponses, 
isAsyncRequest);
+    }
+
+    CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>();
+    if (!isAsyncRequest) {
+      try {
+        updateFuture.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof SolrServerException) {
+          throw (SolrServerException) cause;
+        } else if (cause instanceof RuntimeException) {
+          throw (RuntimeException) cause;
+        } else {
+          throw new SolrServerException(cause);
         }
       }
+      doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, 
apiFuture, start, isAsyncRequest);
+    } else {
+      updateFuture.whenComplete((result, error) -> {
+        if (updateFuture.isCompletedExceptionally()) {
+          apiFuture.completeExceptionally(error);
+        } else {
+          doDeleteQuery(updateRequest, nonRoutableParams, routes, 
shardResponses, apiFuture, start, isAsyncRequest);
+        }
+      });
 
-      for (final Map.Entry<String, Future<NamedList<?>>> entry: 
responseFutures.entrySet()) {
-        final String url = entry.getKey();
-        final Future<NamedList<?>> responseFuture = entry.getValue();
-        try {
-          shardResponses.add(url, responseFuture.get());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          exceptions.add(url, e.getCause());
+      apiFuture.exceptionally((error) -> {
+        if (apiFuture.isCancelled()) {
+          updateFuture.cancel(true);
         }
+        return null;
+      });
+    }
+
+    return apiFuture;
+  }
+
+  private CompletableFuture<Void> doUpdatesWithExecutor(final Map<String, ? 
extends LBSolrClient.Req> routes,
+                                                        
NamedList<NamedList<?>> shardResponses,
+                                                        boolean 
isAsyncRequest) {
+    final NamedList<Throwable> exceptions = new NamedList<>();
+    final Map<String, CompletableFuture<NamedList<Object>>> responseFutures = 
new HashMap<>(routes.size());
+    for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
+      final String url = entry.getKey();
+      final LBSolrClient.Req lbRequest = entry.getValue();
+      try {
+        MDC.put("CloudSolrClient.url", url);
+        final CompletableFuture<NamedList<Object>> future = new 
CompletableFuture<>();
+        if (isAsyncRequest) {
+          CompletableFuture<LBSolrClient.Rsp> reqFuture = 
getLbClient().requestAsync(lbRequest);
+          reqFuture.whenComplete((result, error) -> {
+            if (!reqFuture.isCompletedExceptionally()) {
+              future.complete(result.getResponse());
+            } else {
+              future.completeExceptionally(error);
+            }
+          });
+        } else {
+          threadPool.submit(() -> {
+            try {
+              future.complete(getLbClient().request(lbRequest).getResponse());
+            } catch (Exception e) {
+              future.completeExceptionally(e);
+            }
+          });
+
+          responseFutures.put(url, future.whenComplete((result, error) -> {

Review comment:
       Here I see you populate shardResponses & exceptions only for the 
synchronous code path.  I don't see why we don't do something with the async 
code path.  The CF that's created leaves scope and doesn't save its data 
anywhere.  I suspect the async code path with parallel updates is not tested.

##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
##########
@@ -476,8 +479,9 @@ public void registerDocCollectionWatcher(String collection, 
DocCollectionWatcher
     
assertZKStateProvider().zkStateReader.registerDocCollectionWatcher(collection, 
watcher);
   }
 
-  @SuppressWarnings({"unchecked"})
-  private NamedList<Object> directUpdate(AbstractUpdateRequest request, String 
collection) throws SolrServerException {
+  private CompletableFuture<NamedList<Object>> 
directUpdate(AbstractUpdateRequest request,

Review comment:
       I have to wonder if it's worthwhile to have directUpdate support Http2 
Async callback given the complexity here?  I *do* like it internally for 
parallel updates -- doUpdatesWithExecutor.

##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
##########
@@ -537,40 +541,100 @@ public void registerDocCollectionWatcher(String 
collection, DocCollectionWatcher
       }
     }
 
-    final NamedList<Throwable> exceptions = new NamedList<>();
-    @SuppressWarnings({"rawtypes"})
-    final NamedList<NamedList> shardResponses = new 
NamedList<>(routes.size()+1); // +1 for deleteQuery
+    final NamedList<NamedList<?>> shardResponses = new 
NamedList<>(routes.size()+1); // +1 for deleteQuery
 
     long start = System.nanoTime();
 
+    CompletableFuture<Void> updateFuture;
     if (parallelUpdates) {
-      final Map<String, Future<NamedList<?>>> responseFutures = new 
HashMap<>(routes.size());
-      for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
-        final String url = entry.getKey();
-        final LBSolrClient.Req lbRequest = entry.getValue();
-        try {
-          MDC.put("CloudSolrClient.url", url);
-          responseFutures.put(url, threadPool.submit(() -> {
-            return getLbClient().request(lbRequest).getResponse();
-          }));
-        } finally {
-          MDC.remove("CloudSolrClient.url");
+      updateFuture = doUpdatesWithExecutor(routes, shardResponses, 
isAsyncRequest);
+    } else {
+      updateFuture = doUpdatesWithoutExecutor(routes, shardResponses, 
isAsyncRequest);
+    }
+
+    CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>();
+    if (!isAsyncRequest) {
+      try {
+        updateFuture.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof SolrServerException) {
+          throw (SolrServerException) cause;
+        } else if (cause instanceof RuntimeException) {
+          throw (RuntimeException) cause;
+        } else {
+          throw new SolrServerException(cause);
         }
       }
+      doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, 
apiFuture, start, isAsyncRequest);
+    } else {
+      updateFuture.whenComplete((result, error) -> {
+        if (updateFuture.isCompletedExceptionally()) {
+          apiFuture.completeExceptionally(error);
+        } else {
+          doDeleteQuery(updateRequest, nonRoutableParams, routes, 
shardResponses, apiFuture, start, isAsyncRequest);
+        }
+      });
 
-      for (final Map.Entry<String, Future<NamedList<?>>> entry: 
responseFutures.entrySet()) {
-        final String url = entry.getKey();
-        final Future<NamedList<?>> responseFuture = entry.getValue();
-        try {
-          shardResponses.add(url, responseFuture.get());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          exceptions.add(url, e.getCause());
+      apiFuture.exceptionally((error) -> {
+        if (apiFuture.isCancelled()) {
+          updateFuture.cancel(true);
         }
+        return null;
+      });
+    }
+
+    return apiFuture;
+  }
+
+  private CompletableFuture<Void> doUpdatesWithExecutor(final Map<String, ? 
extends LBSolrClient.Req> routes,
+                                                        
NamedList<NamedList<?>> shardResponses,
+                                                        boolean 
isAsyncRequest) {
+    final NamedList<Throwable> exceptions = new NamedList<>();
+    final Map<String, CompletableFuture<NamedList<Object>>> responseFutures = 
new HashMap<>(routes.size());
+    for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
+      final String url = entry.getKey();
+      final LBSolrClient.Req lbRequest = entry.getValue();
+      try {
+        MDC.put("CloudSolrClient.url", url);
+        final CompletableFuture<NamedList<Object>> future = new 
CompletableFuture<>();
+        if (isAsyncRequest) {
+          CompletableFuture<LBSolrClient.Rsp> reqFuture = 
getLbClient().requestAsync(lbRequest);
+          reqFuture.whenComplete((result, error) -> {
+            if (!reqFuture.isCompletedExceptionally()) {
+              future.complete(result.getResponse());
+            } else {
+              future.completeExceptionally(error);
+            }
+          });
+        } else {
+          threadPool.submit(() -> {
+            try {
+              future.complete(getLbClient().request(lbRequest).getResponse());
+            } catch (Exception e) {
+              future.completeExceptionally(e);
+            }
+          });
+
+          responseFutures.put(url, future.whenComplete((result, error) -> {

Review comment:
       Also, there needs to be synchronization to avoid concurrent population 
of shardResponses & exceptions structures.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to