ndimiduk commented on code in PR #6167: URL: https://github.com/apache/hbase/pull/6167#discussion_r1740969491
########## hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java: ########## @@ -92,6 +95,11 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class); + private static final HashedWheelTimer HWT = new HashedWheelTimer( Review Comment: Is there an existing HWT that can be used for this purpose? Maybe something tied to the Connection? ########## hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java: ########## @@ -106,6 +126,31 @@ public synchronized void onComplete() { finished = true; future.complete(getFinalResult()); } + + @Override + public ServiceCaller<AggregateService, AggregateResponse> + getNextCallable(AggregateResponse response, RegionInfo region) { + if (!response.hasNextChunkStartRow()) { + return null; + } + return (stub, controller, rpcCallback) -> { + AggregateRequest.Builder updatedRequest = AggregateRequest.newBuilder(originalRequest); + updatedRequest.setScan(originalRequest.getScan().toBuilder() + .setStartRow(response.getNextChunkStartRow()).build()); + if (log.isTraceEnabled()) { + log.trace("Got incomplete result {} for original scan {}. Sending new request {}.", Review Comment: nit: "sending _next_ request" ? That is, this is considered to be part of the happy path, it is the case that an aggregation request can span multiple RPCs, so it's not a "new as in replacement" request, but a "new as in next in the chain of indeterminant length" request, right? ########## hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java: ########## @@ -131,32 +175,38 @@ private static byte[] nullToEmpty(byte[] b) { CompletableFuture<R> future = new CompletableFuture<>(); AggregateRequest req; try { - req = validateArgAndGetPB(scan, ci, false); + req = validateArgAndGetPB(scan, ci, false, true); } catch (IOException e) { future.completeExceptionally(e); return future; } - AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) { + AbstractAggregationCallback<R> callback = + new AbstractAggregationCallback<>(future, req, AggregateService::getMax) { - private R max; + private R max; + private final Object lock = new Object(); Review Comment: if multiple threads need to coordinate on retrieving this value, should they be synchronising on a final result? As in, should `getFinalResult` block on completion of a future Future instead of simply locking around accesses of `max` ? ########## hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java: ########## @@ -822,7 +830,30 @@ private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker, } else { callback.onRegionComplete(region, r); } - if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { + + boolean complete = unfinishedRequest.decrementAndGet() == 0 && locateFinished.get(); + + if (e == null && r != null) { + ServiceCaller<S, R> updatedCallable = callback.getNextCallable(r, region); + if (updatedCallable != null) { + // We are launching a new request now, so un-set complete + complete = false; + long waitIntervalMs = callback.getWaitIntervalMs(r, region); + LOG.trace("Coprocessor returned incomplete result. " + + "Sleeping for {} millis before making follow-up request.", waitIntervalMs); + if (waitIntervalMs > 0) { + HWT.newTimeout( + (timeout) -> onLocateComplete(stubMaker, updatedCallable, callback, endKey, + endKeyInclusive, locateFinished, unfinishedRequest, loc, null), + waitIntervalMs, TimeUnit.MILLISECONDS); + } else { + onLocateComplete(stubMaker, updatedCallable, callback, endKey, endKeyInclusive, + locateFinished, unfinishedRequest, loc, null); + } + } Review Comment: I'm not very familiar with the Netty call-back code. This listener invokes again the `onLocateComplete` method, which effectively makes this call recursive? In that case, does it mean that the `if(complete)` body below will be potentially invoked multiple times as listeners are popped off the stack? That is, when `updatedCallable != null`, should the end of this if-block exit early, rather than executing all the way through to checking for completeness? Oh. Right. You set `complete = false` inside this block, so that even when control flow gets down there, nothing happens. I think this would be less confusing if the method exited after calling `onLocateComplete` rather than letting control continue through the end of the method. I dunno. Take this comment with a grain of salt. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org