dsmiley commented on a change in pull request #1770: URL: https://github.com/apache/lucene-solr/pull/1770#discussion_r479687914
########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java ########## @@ -82,18 +81,19 @@ protected SolrClient getClient(String baseUrl) { return httpClient; } - public Cancellable asyncReq(Req req, AsyncListener<Rsp> asyncListener) { + @Override + public CompletableFuture<Rsp> requestAsync(Req req) { + CompletableFuture<Rsp> apiFuture = new CompletableFuture<>(); Rsp rsp = new Rsp(); boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath()); ServerIterator it = new ServerIterator(req, zombieServers); - asyncListener.onStart(); final AtomicBoolean cancelled = new AtomicBoolean(false); Review comment: It seems to me that the changes you've done allow us to remove the AtomicBoolean cancelled (with synchronization of it), simplifying the code. If we want to know if the request has been cancelled, we can simply check apiFuture.isCancelled. Right? @CaoManhDat was/is the synchronization necessary? ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java ########## @@ -580,23 +644,71 @@ public void registerDocCollectionWatcher(String collection, DocCollectionWatcher throw getRouteException(SolrException.ErrorCode.SERVER_ERROR, exceptions, routes); } } - } else { - for (Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) { - String url = entry.getKey(); - LBSolrClient.Req lbRequest = entry.getValue(); + return null; + }); + updateFuture.exceptionally((error) -> { + if (updateFuture.isCancelled()) { + for (CompletableFuture<NamedList<Object>> cf : futuresArray) { + cf.cancel(true); + } + } + return null; + }); + return updateFuture; + } + + private CompletableFuture<Void> doUpdatesWithoutExecutor(final Map<String, ? extends LBSolrClient.Req> routes, Review comment: Since the asyc & sync code paths are rather different inside here, I think you should structure this method as a big if-else. The only part in common is like 2 lines. ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java ########## @@ -112,95 +112,106 @@ public void onFailure(Exception e, boolean retryReq) { if (cancelled.get()) { return; } - Cancellable cancellable = doRequest(url, req, rsp, isNonRetryable, it.isServingZombieServer(), this); - currentCancellable.set(cancellable); + CompletableFuture<NamedList<Object>> future = doAsyncRequest(url, req, rsp, isNonRetryable, it.isServingZombieServer(), this); + currentFuture.set(future); } } finally { MDC.remove("LBSolrClient.url"); } } else { - asyncListener.onFailure(e); + apiFuture.completeExceptionally(e); } } }; try { - Cancellable cancellable = doRequest(it.nextOrError(), req, rsp, isNonRetryable, it.isServingZombieServer(), retryListener); - currentCancellable.set(cancellable); + CompletableFuture<NamedList<Object>> future = doAsyncRequest(it.nextOrError(), req, rsp, isNonRetryable, it.isServingZombieServer(), retryListener); + currentFuture.set(future); } catch (SolrServerException e) { - asyncListener.onFailure(e); + apiFuture.completeExceptionally(e); } - return () -> { - synchronized (cancelled) { - cancelled.set(true); - if (currentCancellable.get() != null) { - currentCancellable.get().cancel(); + apiFuture.exceptionally((error) -> { + if (apiFuture.isCancelled()) { + synchronized (cancelled) { + cancelled.set(true); + if (currentFuture.get() != null) { Review comment: Let's replace with a get to a local variable, then cancel the local var if non-null. Or... hmmm... seems practically speaking this AtomicReference will never have a null value. If the catch of SolrServerException above returned apiFuture right there, then it would be clearer that currentFuture must always have a non-null value. ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java ########## @@ -112,95 +112,106 @@ public void onFailure(Exception e, boolean retryReq) { if (cancelled.get()) { return; } - Cancellable cancellable = doRequest(url, req, rsp, isNonRetryable, it.isServingZombieServer(), this); - currentCancellable.set(cancellable); + CompletableFuture<NamedList<Object>> future = doAsyncRequest(url, req, rsp, isNonRetryable, it.isServingZombieServer(), this); + currentFuture.set(future); } } finally { MDC.remove("LBSolrClient.url"); } } else { - asyncListener.onFailure(e); + apiFuture.completeExceptionally(e); } } }; try { - Cancellable cancellable = doRequest(it.nextOrError(), req, rsp, isNonRetryable, it.isServingZombieServer(), retryListener); - currentCancellable.set(cancellable); + CompletableFuture<NamedList<Object>> future = doAsyncRequest(it.nextOrError(), req, rsp, isNonRetryable, it.isServingZombieServer(), retryListener); + currentFuture.set(future); } catch (SolrServerException e) { - asyncListener.onFailure(e); + apiFuture.completeExceptionally(e); } - return () -> { - synchronized (cancelled) { - cancelled.set(true); - if (currentCancellable.get() != null) { - currentCancellable.get().cancel(); + apiFuture.exceptionally((error) -> { + if (apiFuture.isCancelled()) { + synchronized (cancelled) { + cancelled.set(true); + if (currentFuture.get() != null) { + currentFuture.get().cancel(true); + } } } - }; + return null; + }); + return apiFuture; } private interface RetryListener { void onSuccess(Rsp rsp); void onFailure(Exception e, boolean retryReq); } - private Cancellable doRequest(String baseUrl, Req req, Rsp rsp, boolean isNonRetryable, + private CompletableFuture<NamedList<Object>> doAsyncRequest(String baseUrl, Req req, Rsp rsp, boolean isNonRetryable, boolean isZombie, RetryListener listener) { rsp.server = baseUrl; req.getRequest().setBasePath(baseUrl); - return ((Http2SolrClient)getClient(baseUrl)).asyncRequest(req.getRequest(), null, new AsyncListener<>() { - @Override - public void onSuccess(NamedList<Object> result) { - rsp.rsp = result; + CompletableFuture<NamedList<Object>> future = ((Http2SolrClient)getClient(baseUrl)).requestAsync(req.getRequest(), null); Review comment: The cast to Http2SolrClient is not necessary (so says my IDE). Furthermore, the getClient method could be modified to have a covariant return type of Http2SolrClient to make this clearer. ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java ########## @@ -537,40 +541,100 @@ public void registerDocCollectionWatcher(String collection, DocCollectionWatcher } } - final NamedList<Throwable> exceptions = new NamedList<>(); - @SuppressWarnings({"rawtypes"}) - final NamedList<NamedList> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery + final NamedList<NamedList<?>> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery long start = System.nanoTime(); + CompletableFuture<Void> updateFuture; if (parallelUpdates) { - final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size()); - for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) { - final String url = entry.getKey(); - final LBSolrClient.Req lbRequest = entry.getValue(); - try { - MDC.put("CloudSolrClient.url", url); - responseFutures.put(url, threadPool.submit(() -> { - return getLbClient().request(lbRequest).getResponse(); - })); - } finally { - MDC.remove("CloudSolrClient.url"); + updateFuture = doUpdatesWithExecutor(routes, shardResponses, isAsyncRequest); + } else { + updateFuture = doUpdatesWithoutExecutor(routes, shardResponses, isAsyncRequest); + } + + CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>(); + if (!isAsyncRequest) { + try { + updateFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof SolrServerException) { + throw (SolrServerException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new SolrServerException(cause); } } + doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, apiFuture, start, isAsyncRequest); + } else { + updateFuture.whenComplete((result, error) -> { + if (updateFuture.isCompletedExceptionally()) { + apiFuture.completeExceptionally(error); + } else { + doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, apiFuture, start, isAsyncRequest); + } + }); - for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) { - final String url = entry.getKey(); - final Future<NamedList<?>> responseFuture = entry.getValue(); - try { - shardResponses.add(url, responseFuture.get()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - exceptions.add(url, e.getCause()); + apiFuture.exceptionally((error) -> { + if (apiFuture.isCancelled()) { + updateFuture.cancel(true); } + return null; + }); + } + + return apiFuture; + } + + private CompletableFuture<Void> doUpdatesWithExecutor(final Map<String, ? extends LBSolrClient.Req> routes, + NamedList<NamedList<?>> shardResponses, + boolean isAsyncRequest) { + final NamedList<Throwable> exceptions = new NamedList<>(); + final Map<String, CompletableFuture<NamedList<Object>>> responseFutures = new HashMap<>(routes.size()); + for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) { + final String url = entry.getKey(); + final LBSolrClient.Req lbRequest = entry.getValue(); + try { + MDC.put("CloudSolrClient.url", url); + final CompletableFuture<NamedList<Object>> future = new CompletableFuture<>(); + if (isAsyncRequest) { + CompletableFuture<LBSolrClient.Rsp> reqFuture = getLbClient().requestAsync(lbRequest); + reqFuture.whenComplete((result, error) -> { Review comment: This can be replaced with: `future = reqFuture.thenApply(LBSolrClient.Rsp::getResponse);` ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java ########## @@ -537,40 +541,100 @@ public void registerDocCollectionWatcher(String collection, DocCollectionWatcher } } - final NamedList<Throwable> exceptions = new NamedList<>(); - @SuppressWarnings({"rawtypes"}) - final NamedList<NamedList> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery + final NamedList<NamedList<?>> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery long start = System.nanoTime(); + CompletableFuture<Void> updateFuture; if (parallelUpdates) { - final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size()); - for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) { - final String url = entry.getKey(); - final LBSolrClient.Req lbRequest = entry.getValue(); - try { - MDC.put("CloudSolrClient.url", url); - responseFutures.put(url, threadPool.submit(() -> { - return getLbClient().request(lbRequest).getResponse(); - })); - } finally { - MDC.remove("CloudSolrClient.url"); + updateFuture = doUpdatesWithExecutor(routes, shardResponses, isAsyncRequest); + } else { + updateFuture = doUpdatesWithoutExecutor(routes, shardResponses, isAsyncRequest); + } + + CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>(); + if (!isAsyncRequest) { + try { + updateFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof SolrServerException) { + throw (SolrServerException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new SolrServerException(cause); } } + doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, apiFuture, start, isAsyncRequest); + } else { + updateFuture.whenComplete((result, error) -> { + if (updateFuture.isCompletedExceptionally()) { + apiFuture.completeExceptionally(error); + } else { + doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, apiFuture, start, isAsyncRequest); + } + }); - for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) { - final String url = entry.getKey(); - final Future<NamedList<?>> responseFuture = entry.getValue(); - try { - shardResponses.add(url, responseFuture.get()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - exceptions.add(url, e.getCause()); + apiFuture.exceptionally((error) -> { + if (apiFuture.isCancelled()) { + updateFuture.cancel(true); } + return null; + }); + } + + return apiFuture; + } + + private CompletableFuture<Void> doUpdatesWithExecutor(final Map<String, ? extends LBSolrClient.Req> routes, + NamedList<NamedList<?>> shardResponses, + boolean isAsyncRequest) { + final NamedList<Throwable> exceptions = new NamedList<>(); + final Map<String, CompletableFuture<NamedList<Object>>> responseFutures = new HashMap<>(routes.size()); + for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) { + final String url = entry.getKey(); + final LBSolrClient.Req lbRequest = entry.getValue(); + try { + MDC.put("CloudSolrClient.url", url); + final CompletableFuture<NamedList<Object>> future = new CompletableFuture<>(); + if (isAsyncRequest) { + CompletableFuture<LBSolrClient.Rsp> reqFuture = getLbClient().requestAsync(lbRequest); + reqFuture.whenComplete((result, error) -> { + if (!reqFuture.isCompletedExceptionally()) { + future.complete(result.getResponse()); + } else { + future.completeExceptionally(error); + } + }); + } else { Review comment: Once we can guarantee requestAsync will always work, I think we should always do parallel updates with an async code path, thus allowing us to remove some of this code. A step in that direction would be to add an Executor argument to LbSolrClient.requestAsync and do a trivial default implementation using it. ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java ########## @@ -537,40 +541,100 @@ public void registerDocCollectionWatcher(String collection, DocCollectionWatcher } } - final NamedList<Throwable> exceptions = new NamedList<>(); - @SuppressWarnings({"rawtypes"}) - final NamedList<NamedList> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery + final NamedList<NamedList<?>> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery long start = System.nanoTime(); + CompletableFuture<Void> updateFuture; if (parallelUpdates) { - final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size()); - for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) { - final String url = entry.getKey(); - final LBSolrClient.Req lbRequest = entry.getValue(); - try { - MDC.put("CloudSolrClient.url", url); - responseFutures.put(url, threadPool.submit(() -> { - return getLbClient().request(lbRequest).getResponse(); - })); - } finally { - MDC.remove("CloudSolrClient.url"); + updateFuture = doUpdatesWithExecutor(routes, shardResponses, isAsyncRequest); + } else { + updateFuture = doUpdatesWithoutExecutor(routes, shardResponses, isAsyncRequest); + } + + CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>(); + if (!isAsyncRequest) { + try { + updateFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof SolrServerException) { + throw (SolrServerException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new SolrServerException(cause); } } + doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, apiFuture, start, isAsyncRequest); + } else { + updateFuture.whenComplete((result, error) -> { + if (updateFuture.isCompletedExceptionally()) { + apiFuture.completeExceptionally(error); + } else { + doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, apiFuture, start, isAsyncRequest); + } + }); - for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) { - final String url = entry.getKey(); - final Future<NamedList<?>> responseFuture = entry.getValue(); - try { - shardResponses.add(url, responseFuture.get()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - exceptions.add(url, e.getCause()); + apiFuture.exceptionally((error) -> { + if (apiFuture.isCancelled()) { + updateFuture.cancel(true); } + return null; + }); + } + + return apiFuture; + } + + private CompletableFuture<Void> doUpdatesWithExecutor(final Map<String, ? extends LBSolrClient.Req> routes, + NamedList<NamedList<?>> shardResponses, + boolean isAsyncRequest) { + final NamedList<Throwable> exceptions = new NamedList<>(); + final Map<String, CompletableFuture<NamedList<Object>>> responseFutures = new HashMap<>(routes.size()); + for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) { + final String url = entry.getKey(); + final LBSolrClient.Req lbRequest = entry.getValue(); + try { + MDC.put("CloudSolrClient.url", url); + final CompletableFuture<NamedList<Object>> future = new CompletableFuture<>(); + if (isAsyncRequest) { + CompletableFuture<LBSolrClient.Rsp> reqFuture = getLbClient().requestAsync(lbRequest); + reqFuture.whenComplete((result, error) -> { + if (!reqFuture.isCompletedExceptionally()) { + future.complete(result.getResponse()); + } else { + future.completeExceptionally(error); + } + }); + } else { + threadPool.submit(() -> { + try { + future.complete(getLbClient().request(lbRequest).getResponse()); + } catch (Exception e) { + future.completeExceptionally(e); + } + }); + + responseFutures.put(url, future.whenComplete((result, error) -> { Review comment: Here I see you populate shardResponses & exceptions only for the synchronous code path. I don't see why we don't do something with the async code path. The CF that's created leaves scope and doesn't save its data anywhere. I suspect the async code path with parallel updates is not tested. ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java ########## @@ -476,8 +479,9 @@ public void registerDocCollectionWatcher(String collection, DocCollectionWatcher assertZKStateProvider().zkStateReader.registerDocCollectionWatcher(collection, watcher); } - @SuppressWarnings({"unchecked"}) - private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection) throws SolrServerException { + private CompletableFuture<NamedList<Object>> directUpdate(AbstractUpdateRequest request, Review comment: I have to wonder if it's worthwhile to have directUpdate support Http2 Async callback given the complexity here? I *do* like it internally for parallel updates -- doUpdatesWithExecutor. ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java ########## @@ -537,40 +541,100 @@ public void registerDocCollectionWatcher(String collection, DocCollectionWatcher } } - final NamedList<Throwable> exceptions = new NamedList<>(); - @SuppressWarnings({"rawtypes"}) - final NamedList<NamedList> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery + final NamedList<NamedList<?>> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery long start = System.nanoTime(); + CompletableFuture<Void> updateFuture; if (parallelUpdates) { - final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size()); - for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) { - final String url = entry.getKey(); - final LBSolrClient.Req lbRequest = entry.getValue(); - try { - MDC.put("CloudSolrClient.url", url); - responseFutures.put(url, threadPool.submit(() -> { - return getLbClient().request(lbRequest).getResponse(); - })); - } finally { - MDC.remove("CloudSolrClient.url"); + updateFuture = doUpdatesWithExecutor(routes, shardResponses, isAsyncRequest); + } else { + updateFuture = doUpdatesWithoutExecutor(routes, shardResponses, isAsyncRequest); + } + + CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>(); + if (!isAsyncRequest) { + try { + updateFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof SolrServerException) { + throw (SolrServerException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new SolrServerException(cause); } } + doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, apiFuture, start, isAsyncRequest); + } else { + updateFuture.whenComplete((result, error) -> { + if (updateFuture.isCompletedExceptionally()) { + apiFuture.completeExceptionally(error); + } else { + doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, apiFuture, start, isAsyncRequest); + } + }); - for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) { - final String url = entry.getKey(); - final Future<NamedList<?>> responseFuture = entry.getValue(); - try { - shardResponses.add(url, responseFuture.get()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - exceptions.add(url, e.getCause()); + apiFuture.exceptionally((error) -> { + if (apiFuture.isCancelled()) { + updateFuture.cancel(true); } + return null; + }); + } + + return apiFuture; + } + + private CompletableFuture<Void> doUpdatesWithExecutor(final Map<String, ? extends LBSolrClient.Req> routes, + NamedList<NamedList<?>> shardResponses, + boolean isAsyncRequest) { + final NamedList<Throwable> exceptions = new NamedList<>(); + final Map<String, CompletableFuture<NamedList<Object>>> responseFutures = new HashMap<>(routes.size()); + for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) { + final String url = entry.getKey(); + final LBSolrClient.Req lbRequest = entry.getValue(); + try { + MDC.put("CloudSolrClient.url", url); + final CompletableFuture<NamedList<Object>> future = new CompletableFuture<>(); + if (isAsyncRequest) { + CompletableFuture<LBSolrClient.Rsp> reqFuture = getLbClient().requestAsync(lbRequest); + reqFuture.whenComplete((result, error) -> { + if (!reqFuture.isCompletedExceptionally()) { + future.complete(result.getResponse()); + } else { + future.completeExceptionally(error); + } + }); + } else { + threadPool.submit(() -> { + try { + future.complete(getLbClient().request(lbRequest).getResponse()); + } catch (Exception e) { + future.completeExceptionally(e); + } + }); + + responseFutures.put(url, future.whenComplete((result, error) -> { Review comment: Also, there needs to be synchronization to avoid concurrent population of shardResponses & exceptions structures. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org