ndimiduk commented on code in PR #6167:
URL: https://github.com/apache/hbase/pull/6167#discussion_r1740969491


##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -92,6 +95,11 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RawAsyncTableImpl.class);
 
+  private static final HashedWheelTimer HWT = new HashedWheelTimer(

Review Comment:
   Is there an existing HWT that can be used for this purpose? Maybe something 
tied to the Connection?



##########
hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java:
##########
@@ -106,6 +126,31 @@ public synchronized void onComplete() {
       finished = true;
       future.complete(getFinalResult());
     }
+
+    @Override
+    public ServiceCaller<AggregateService, AggregateResponse>
+      getNextCallable(AggregateResponse response, RegionInfo region) {
+      if (!response.hasNextChunkStartRow()) {
+        return null;
+      }
+      return (stub, controller, rpcCallback) -> {
+        AggregateRequest.Builder updatedRequest = 
AggregateRequest.newBuilder(originalRequest);
+        updatedRequest.setScan(originalRequest.getScan().toBuilder()
+          .setStartRow(response.getNextChunkStartRow()).build());
+        if (log.isTraceEnabled()) {
+          log.trace("Got incomplete result {} for original scan {}. Sending 
new request {}.",

Review Comment:
   nit: "sending _next_ request" ?
   
   That is, this is considered to be part of the happy path, it is the case 
that an aggregation request can span multiple RPCs, so it's not a "new as in 
replacement" request, but a "new as in next in the chain of indeterminant 
length" request, right?



##########
hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java:
##########
@@ -131,32 +175,38 @@ private static byte[] nullToEmpty(byte[] b) {
     CompletableFuture<R> future = new CompletableFuture<>();
     AggregateRequest req;
     try {
-      req = validateArgAndGetPB(scan, ci, false);
+      req = validateArgAndGetPB(scan, ci, false, true);
     } catch (IOException e) {
       future.completeExceptionally(e);
       return future;
     }
-    AbstractAggregationCallback<R> callback = new 
AbstractAggregationCallback<R>(future) {
+    AbstractAggregationCallback<R> callback =
+      new AbstractAggregationCallback<>(future, req, AggregateService::getMax) 
{
 
-      private R max;
+        private R max;
+        private final Object lock = new Object();

Review Comment:
   if multiple threads need to coordinate on retrieving this value, should they 
be synchronising on a final result? As in, should `getFinalResult` block on 
completion of a future Future instead of simply locking around accesses of 
`max` ?



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -822,7 +830,30 @@ private <S, R> void onLocateComplete(Function<RpcChannel, 
S> stubMaker,
         } else {
           callback.onRegionComplete(region, r);
         }
-        if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
+
+        boolean complete = unfinishedRequest.decrementAndGet() == 0 && 
locateFinished.get();
+
+        if (e == null && r != null) {
+          ServiceCaller<S, R> updatedCallable = callback.getNextCallable(r, 
region);
+          if (updatedCallable != null) {
+            // We are launching a new request now, so un-set complete
+            complete = false;
+            long waitIntervalMs = callback.getWaitIntervalMs(r, region);
+            LOG.trace("Coprocessor returned incomplete result. "
+              + "Sleeping for {} millis before making follow-up request.", 
waitIntervalMs);
+            if (waitIntervalMs > 0) {
+              HWT.newTimeout(
+                (timeout) -> onLocateComplete(stubMaker, updatedCallable, 
callback, endKey,
+                  endKeyInclusive, locateFinished, unfinishedRequest, loc, 
null),
+                waitIntervalMs, TimeUnit.MILLISECONDS);
+            } else {
+              onLocateComplete(stubMaker, updatedCallable, callback, endKey, 
endKeyInclusive,
+                locateFinished, unfinishedRequest, loc, null);
+            }
+          }

Review Comment:
   I'm not very familiar with the Netty call-back code. This listener invokes 
again the `onLocateComplete` method, which effectively makes this call 
recursive? In that case, does it mean that the `if(complete)` body below will 
be potentially invoked multiple times as listeners are popped off the stack? 
That is, when `updatedCallable != null`, should the end of this if-block exit 
early, rather than executing all the way through to checking for completeness?
   
   Oh. Right. You set `complete = false` inside this block, so that even when 
control flow gets down there, nothing happens. I think this would be less 
confusing if the method exited after calling `onLocateComplete` rather than 
letting control continue through the end of the method.
   
   I dunno. Take this comment with a grain of salt.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to