dsmiley commented on a change in pull request #1770: URL: https://github.com/apache/lucene-solr/pull/1770#discussion_r480417292
########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java ########## @@ -537,40 +541,100 @@ public void registerDocCollectionWatcher(String collection, DocCollectionWatcher } } - final NamedList<Throwable> exceptions = new NamedList<>(); - @SuppressWarnings({"rawtypes"}) - final NamedList<NamedList> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery + final NamedList<NamedList<?>> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery long start = System.nanoTime(); + CompletableFuture<Void> updateFuture; if (parallelUpdates) { - final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size()); - for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) { - final String url = entry.getKey(); - final LBSolrClient.Req lbRequest = entry.getValue(); - try { - MDC.put("CloudSolrClient.url", url); - responseFutures.put(url, threadPool.submit(() -> { - return getLbClient().request(lbRequest).getResponse(); - })); - } finally { - MDC.remove("CloudSolrClient.url"); + updateFuture = doUpdatesWithExecutor(routes, shardResponses, isAsyncRequest); + } else { + updateFuture = doUpdatesWithoutExecutor(routes, shardResponses, isAsyncRequest); + } + + CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>(); + if (!isAsyncRequest) { + try { + updateFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof SolrServerException) { + throw (SolrServerException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new SolrServerException(cause); } } + doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, apiFuture, start, isAsyncRequest); + } else { + updateFuture.whenComplete((result, error) -> { + if (updateFuture.isCompletedExceptionally()) { Review comment: Minor point: I think it reads nicer to have the non-exceptional code first, and the exceptional path last. ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java ########## @@ -862,15 +1015,39 @@ public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map< } List<String> inputCollections = collection == null ? Collections.emptyList() : StrUtils.splitSmart(collection, ",", true); - return requestWithRetryOnStaleState(request, 0, inputCollections); + + CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>(); + final AtomicBoolean cancelled = new AtomicBoolean(false); + final AtomicReference<CompletableFuture<NamedList<Object>>> currentFuture = new AtomicReference<>(); + apiFuture.exceptionally((error) -> { + if (apiFuture.isCancelled()) { + synchronized (cancelled) { Review comment: This code is similar to the LB class where I provided feedback. The same feedback may apply here -- I don't see the point of synchronization on cancelled or why cancelled exists if we have a CF which can hold that cancelled state. ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java ########## @@ -853,6 +993,19 @@ public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map< @Override public NamedList<Object> request(@SuppressWarnings({"rawtypes"})SolrRequest request, String collection) throws SolrServerException, IOException { + // synchronous requests should return an already completed future + return getNowOrException(makeRequest(request, collection, false)); + } + + /** + * Makes a request either synchronously or asynchronously depending on the isAsyncRequest parameter. The returned + * CompletableFuture will already be completed in the case of a sync request. + * + * For async requests, the internal LB Solr client given by getLbClient() must be an LBHttp2SolrClient. + */ + CompletableFuture<NamedList<Object>> makeRequest(SolrRequest<?> request, Review comment: Define as protected? ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java ########## @@ -862,15 +1015,39 @@ public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map< } List<String> inputCollections = collection == null ? Collections.emptyList() : StrUtils.splitSmart(collection, ",", true); - return requestWithRetryOnStaleState(request, 0, inputCollections); + + CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>(); + final AtomicBoolean cancelled = new AtomicBoolean(false); + final AtomicReference<CompletableFuture<NamedList<Object>>> currentFuture = new AtomicReference<>(); + apiFuture.exceptionally((error) -> { + if (apiFuture.isCancelled()) { + synchronized (cancelled) { Review comment: I'll at-mention @CaoManhDat again whom may know as I think he authored this mechanism. ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java ########## @@ -246,6 +248,42 @@ public final T process(SolrClient client) throws SolrServerException, IOExceptio return process(client, null); } + /** + * Send this request to a {@link SolrClient} asynchronously + * + * @param client the SolrClient to communicate with + * @param collection the collection to execute the request against + * + * @return a {@link CompletableFuture} that tracks the progress of the async request. + * Once completed, the CompletableFuture will contain the response. + */ + public final CompletableFuture<T> processAsynchronously(SolrClient client, String collection) { + final long startNanos = System.nanoTime(); + final CompletableFuture<NamedList<Object>> internalFuture = client.requestAsync(this, collection); + final CompletableFuture<T> apiFuture = new CompletableFuture<>(); + + internalFuture.whenComplete((result, error) -> { Review comment: I think this can be adapted to use `internalFuture.thenApply(...)` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org