charlesconnell commented on code in PR #6167:
URL: https://github.com/apache/hbase/pull/6167#discussion_r1721036978


##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -822,7 +826,25 @@ private <S, R> void onLocateComplete(Function<RpcChannel, 
S> stubMaker,
         } else {
           callback.onRegionComplete(region, r);
         }
-        if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
+        ServiceCaller<S, R> updatedCallable = callback.getNextCallable(r, 
region);

Review Comment:
   This logic is as suggested by @bbeaudreault on the mailing list 
https://lists.apache.org/thread/1vqnxb71z7swq2cogz4qg3cn6b10xp4v



##########
hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java:
##########
@@ -131,32 +175,38 @@ private static byte[] nullToEmpty(byte[] b) {
     CompletableFuture<R> future = new CompletableFuture<>();
     AggregateRequest req;
     try {
-      req = validateArgAndGetPB(scan, ci, false);
+      req = validateArgAndGetPB(scan, ci, false, true);
     } catch (IOException e) {
       future.completeExceptionally(e);
       return future;
     }
-    AbstractAggregationCallback<R> callback = new 
AbstractAggregationCallback<R>(future) {
+    AbstractAggregationCallback<R> callback =
+      new AbstractAggregationCallback<>(future, req, AggregateService::getMax) 
{
 
-      private R max;
+        private R max;
+        private final Object lock = new Object();

Review Comment:
   I'm adding concurrency safety to the aggregation callbacks. The aggregate() 
methods can be called by multiple threads at the same time. getFinalResult() in 
theory could be called by yet a different thread. The aggregation operations 
themselves need their critical sections synchronized, and the last aggregate() 
must happen-before getFinalResult().



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java:
##########
@@ -296,31 +296,16 @@ public <S, R> CompletableFuture<R> 
coprocessorService(Function<RpcChannel, S> st
   public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
     Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
     CoprocessorCallback<R> callback) {
-    final Context context = Context.current();
-    CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() {
-
-      @Override
-      public void onRegionComplete(RegionInfo region, R resp) {
-        pool.execute(context.wrap(() -> callback.onRegionComplete(region, 
resp)));
-      }
-
-      @Override
-      public void onRegionError(RegionInfo region, Throwable error) {
-        pool.execute(context.wrap(() -> callback.onRegionError(region, 
error)));
-      }
-
-      @Override
-      public void onComplete() {
-        pool.execute(context.wrap(callback::onComplete));
-      }
+    return coprocessorService(stubMaker, callable,
+      (PartialResultCoprocessorCallback<S, R>) callback);
+  }
 
-      @Override
-      public void onError(Throwable error) {
-        pool.execute(context.wrap(() -> callback.onError(error)));
-      }
-    };
+  @Override
+  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
+    Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
+    PartialResultCoprocessorCallback<S, R> callback) {
     CoprocessorServiceBuilder<S, R> builder =
-      rawTable.coprocessorService(stubMaker, callable, wrappedCallback);

Review Comment:
   Workaround for https://issues.apache.org/jira/browse/HBASE-28792. I'll do 
that ticket before I let this get merged in in case this isn't the desired 
solution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to