Thank you for the feedback. I will work on the approach that Bryan
suggests. I will need
https://issues.apache.org/jira/browse/HBASE-28346 merged before I can
put up a PR.

On Sat, Jul 20, 2024 at 11:34 AM 张铎(Duo Zhang) <palomino...@gmail.com> wrote:
>
> Ah, you are right, we can add a flag to let the new client set it to a
> non default value.
>
> In this way I prefer we implement the 'partial result' logic. Sleeping
> at server side is not a good idea.
>
> Bryan Beaudreault <bbeaudrea...@apache.org> 于2024年7月20日周六 23:09写道:
> >
> > Since the protocol is protobuf, it should be quite simple. We can add a new
> > field supports_partial to the AggregationRequest proto. Only new clients
> > would set this to true, and that would trigger the partial results on the
> > client. We have a similar concept for how we handle supporting
> > retryImmediately for multi
> >
> > On Sat, Jul 20, 2024 at 9:06 AM 张铎(Duo Zhang) <palomino...@gmail.com> wrote:
> >
> > > I do not think it is easy to change the current implementation to be
> > > 'partial results'.
> > >
> > > The current assumption of request/response is 'send a range and the
> > > agg type'/'return the agg result of the whole range'.
> > >
> > > If you want to make it possible to return earlier, I think we need to
> > > tell the client that
> > > 1. Whether the agg result is for the whole range
> > > 2. If not, next time you should start from which row
> > >
> > > So at least we need to add something to the response protobuf message,
> > > and since there is no version info in the request, I do not know how
> > > to do this without breaking existing clients...
> > >
> > > Maybe we can just deprecate the current aggregation service, and
> > > introduce aggregation v2?
> > >
> > > And on the broken logic for onLocateComplete, Bryan, could you please
> > > provide a UT so I can verify the problem?
> > >
> > > Thanks.
> > >
> > > Bryan Beaudreault <bbeaudrea...@apache.org> 于2024年7月17日周三 23:01写道:
> > > >
> > > > I forgot to respond to your last question.
> > > >
> > > > I'm looking through the implementation of the following classes/methods:
> > > > - RawAsyncTableImpl - onLocateComplete()
> > > > - CoprocessorServiceBuilderImpl (inner class of above)  - execute()
> > > > - CoprocessorCallback (inner interface of AsyncTable)
> > > > - AsyncAggregationClient
> > > >
> > > > So we already use future chaining here. When AsyncAggregationClient is
> > > > invoked, a CompletableFuture is created and stashed into the
> > > > AbstractAggregationCallback and then
> > > > CoprocessorServiceBuilderImpl.execute() is called. In execute(), an 
> > > > async
> > > > region lookup occurs and it uses addListener to call the
> > > onLocateComplete()
> > > > callback when that finishes. In onLocateComplete(), it makes an async
> > > call
> > > > to the callable from AsyncAggregationClient (i.e. () -> 
> > > > stub.rowCount()).
> > > > It uses addListener again to handle the response, calling either
> > > > onRegionComplete or onRegionError and decrementing an unfinishedRequest
> > > > counter. The AbstractAggregationCallback in AsyncAggregationClient adds
> > > an
> > > > implementation for onRegionComplete which passes AggregateResponse to
> > > > aggregate(). Once unfinishedRequests equals 0, it calls
> > > > callback.onComplete(). The implementation of callback.onComplete() gets
> > > the
> > > > final aggregated results and passes that into future.complete(). At this
> > > > point the end-caller sees their response finish.
> > > >
> > > > So that explains how you can do a series of async calls before finally
> > > > completing the top-level future. Unfortunately the interfaces involved
> > > here
> > > > make adding the specific chaining we want to do a bit more complicated.
> > > >
> > > > In an ideal world I think we'd be able to
> > > > update AbstractAggregationCallback.onRegionComplete() to handle a new
> > > > AggregateResponse.isPartial() field. In this case, it'd call aggregate 
> > > > as
> > > > today, but then kick-off a follow-up RPC which would go back through the
> > > > above future chaining and eventually calling aggregate() again with new
> > > > data.  This process would continue until isPartial() is false.
> > > >
> > > > The problem is, we don't have access to the internals of
> > > RawAsyncTableImpl
> > > > in AbstractAggregationCallback or any other way to influence the flow. 
> > > > So
> > > > the first thing we need to do is figure out a change to the interfaces
> > > > which would allow a callback to kick-off sub-requests.  This is sort of
> > > > akin to how we handle retries in the other async callers, which I'm
> > > > noticing the CoprocessorService stuff does not seem to support at all.
> > > >
> > > > The main aspect of the flow that we need to influence is when
> > > > onLocateComplete() calls callback.onComplete() as this triggers the
> > > client
> > > > to return to the end-caller. One way to do it would be if right after
> > > > calling onRegionComplete() we decrement unfinishedCalls and do something
> > > > like:
> > > >
> > > > ServiceCaller<S, R> updatedCallable = callback.getNextCallable();
> > > > if (updatedCallable != null) {
> > > >   // this call is finished, so decrement. but we will kick off a new
> > > > one below, so don't
> > > >   // call onComplete
> > > >   unfinishedRequest.decrementAndGet();
> > > >   onLocateComplete(stubMaker, updatedCallable, callback, new
> > > > ArrayList<>(), endKey,
> > > >     endKeyInclusive, locateFinished, unfinishedRequest, loc, error);
> > > > } else if (unfinishedRequest.decrementAndGet() == 0 &&
> > > locateFinished.get()) {
> > > >   callback.onComplete();
> > > > }
> > > >
> > > > So if there is a next callable, this skips completing the callback and
> > > > instead recursively kicks back into onLocateComplete with the updated
> > > > callable for the remaining results. Our AbstractAggregationCallback 
> > > > class
> > > > would implement getNextCallable(AggregateResponse) with something like
> > > this:
> > > >
> > > > @Override
> > > > protected ServiceCaller<AggregateService, AggregateResponse>
> > > getNextCallable(
> > > >   AggregateResponse response) {
> > > >   if (!response.isPartial()) {
> > > >     return null;
> > > >   }
> > > >   return (stub, controller, rpcCallback) -> {
> > > >     AggregateRequest.Builder updatedRequest =
> > > > AggregateRequest.newBuilder(originalRequest);
> > > >     // todo: pull whatever field we want from the aggregate response
> > > > and set it in the next request
> > > >     //   so that the server can continue where it left off
> > > >     stub.getMax(controller, updatedRequest.build(), rpcCallback);
> > > >   };
> > > > }
> > > >
> > > >
> > > > So with this approach we need to add a getNextCallable() method to the
> > > > CoprocessorCallback interface. I'm not sure if this is the cleanest way
> > > to
> > > > do it, but it's one way that I think would work.
> > > >
> > > > This does not cover the backoff portion. For that, we similarly do not
> > > want
> > > > to sleep in an async client. Instead we'd want to use a HashedWheelTimer
> > > to
> > > > submit the follow-up request async. For example, instead of calling
> > > > onLocateComplete directly in my first snippet above, you'd call
> > > > HWT.newTimeout(t -> onLocateComplete(...)), backoffTime, 
> > > > backoffTimeUnit.
> > > >
> > > > Since Duo wrote most of this code, I wonder if he has any opinions on 
> > > > the
> > > > above recommendations or a different approach. Also, I think
> > > > onLocateComplete is sort of broken for coprocessor calls which span more
> > > > than 2 regions. It looks like we only handle executing against the first
> > > > region and last region, but none of the middle ones. This should 
> > > > probably
> > > > be filed as a bug.
> > > >
> > > > On Tue, Jul 16, 2024 at 6:57 PM Charles Connell
> > > > <cconn...@hubspot.com.invalid> wrote:
> > > >
> > > > > Hi folks,
> > > > >
> > > > > I am considering contributing a PR that will allow the
> > > > > AggregationClient+AggregateImplementation coprocessor to respect quota
> > > > > throttling during its operations, or something in that spirit. I want
> > > > > to gather input from the community on your thoughts about this. In
> > > > > particular here are some questions:
> > > > >
> > > > > 1. Is this idea useful enough to be accepted as a contribution?
> > > > > 2. Would you prefer a change to the existing coprocessor, or a new
> > > > > coprocessor?
> > > > > 3. One way to accomplish this is for the coprocessor endpoint to
> > > > > return a partial result to the client each time it encounters an
> > > > > RpcThrottleException. The client would then sleep and send a new
> > > > > request, repeatedly, until the partial results cumulatively
> > > > > represented the full scan it wanted. Another way to accomplish this is
> > > > > for the coprocessor to sleep on the server side each time it
> > > > > encounters an RpcThrottleException, and return a single complete
> > > > > result. There are pros and cons to each of these approaches. I don't
> > > > > believe there is prior art for either partial results or
> > > > > server-side-sleeps in HBase, so both ideas feel awkward to me. What do
> > > > > you think?
> > > > > 4. If we go with the "partial results" idea, this seems hard to
> > > > > accomplish in an async client. Does anyone have an example of how to
> > > > > cleanly encapsulate a multi-request-response cycle of async requests
> > > > > inside a CompletableFuture? I wonder if this is maybe not possible.
> > > > >
> > > > > Thanks for your time.
> > > > > Charles
> > > > >
> > > > >
> > >
>

Reply via email to