bharathv commented on a change in pull request #954: HBASE-23305: Master based registry implementation URL: https://github.com/apache/hbase/pull/954#discussion_r362716840
########## File path: hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.PrettyPrinter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; + +/** + * A non-blocking implementation of RpcChannel that hedges requests to multiple service end points. + * First received response is returned to the caller. This abstracts out the logic needed to batch + * requests to multiple end points underneath and presents itself as a single logical RpcChannel to + * the client. + * + * Hedging Details: + * --------------- + * - Hedging of RPCs happens in multiple batches. In each iteration, we select a 'batch' of address + * end points to make the call to. We do multiple iterations until we get a proper response to the + * rpc call or all the service addresses are exhausted, which ever happens first. + * + * - We randomize the addresses up front so that the batch order per client is non deterministic. + * This avoids hot spots on the service side. The size of each batch is controlled via 'fanOutSize'. + * Higher fanOutSize implies we make more rpc calls in a single batch. One needs to mindful of the + * load on the client and server side when configuring the fan out. + * + * - In a happy case, once we receive a response from one end point, we cancel all the + * other inflight rpcs in the same batch and return the response to the caller. If we do not get a + * valid response from any address end point, we propagate the error back to the caller. + * + * - Rpc timeouts are applied to every hedged rpc. + * + * - Callers need to be careful about what rpcs they are trying to hedge. Not every kind of call can + * be hedged (for example: cluster state changing rpcs). + * + * (TODO) Retries and Adaptive hedging policy: + * ------------------------------------------ + * + * - No retries are handled at the channel level. Retries can be built in upper layers. However the + * question is, do we even need retries? Hedging in fact is a substitute for retries. + * + * - Clearly hedging puts more load on the service side. To mitigate this, we can make the hedging + * policy more adaptive. In most happy cases, the rpcs from the first few end points should return + * right away (especially short lived rpcs, that do not take up much time). In such cases, hedging + * is not needed. So, the idea is to make this request pattern pluggable so that the requests are + * hedged only when needed. + */ +class HedgedRpcChannel implements RpcChannel { + private static final Logger LOG = LoggerFactory.getLogger(HedgedRpcChannel.class); + + private final AbstractRpcClient rpcClient; + // List of service addresses to hedge the requests to. + private final List<InetSocketAddress> addrs; + private final User ticket; + private final int rpcTimeout; + // Controls the size of request fan out (number of rpcs per a single batch). + private final int fanOutSize; + + /** + * A simple rpc call back implementation to notify the batch context if any rpc is successful. + */ + private static class BatchRpcCtxCallBack implements RpcCallback<Message> { + private final BatchRpcCtx batchRpcCtx; + private final HBaseRpcController rpcController; + BatchRpcCtxCallBack(BatchRpcCtx batchRpcCtx, HBaseRpcController rpcController) { + this.batchRpcCtx = batchRpcCtx; + this.rpcController = rpcController; + } + @Override + public void run(Message result) { + batchRpcCtx.setResultIfNotSet(result, rpcController); + } + } + + /** + * A shared RPC context between a batch of hedged RPCs. Tracks the state and helpers needed to + * synchronize on multiple RPCs to different end points fetching the result. All the methods are + * thread-safe. + */ + private static class BatchRpcCtx { + // Result set by the thread finishing first. Set only once. + private final AtomicReference<Message> result = new AtomicReference<>(); + // Caller waits on this latch being set. + // We set this to 1, so that the first successful RPC result is returned to the client. + private CountDownLatch resultsReady = new CountDownLatch(1); + // Failed rpc book-keeping. + private AtomicInteger failedRpcCount = new AtomicInteger(); + // All the call handles for this batch. + private final List<Call> callsInFlight = Collections.synchronizedList(new ArrayList<>()); + + // Target addresses. + private final List<InetSocketAddress> addresses; + // Called when the result is ready. + private final RpcCallback<Message> callBack; + // Last failed rpc's exception. Used to propagate the reason to the controller. + private IOException lastFailedRpcReason; + + + BatchRpcCtx(List<InetSocketAddress> addresses, RpcCallback<Message> callBack) { + this.addresses = addresses; + this.callBack = Preconditions.checkNotNull(callBack); + } + + /** + * Sets the result only if it is not already set by another thread. Thread that successfully + * sets the result also count downs the latch. + * @param result Result to be set. + */ + public void setResultIfNotSet(Message result, HBaseRpcController rpcController) { + if (result == null) { + incrementFailedRpcs(rpcController.getFailed()); + return; + } + if (this.result.compareAndSet(null, result)) { + resultsReady.countDown(); + // Cancel all pending in flight calls. + for (Call call: callsInFlight) { + // It is ok to do it for all calls as it is a no-op if the call is already done. + call.setException(new CallCancelledException("Hedged call succeeded.")); + } + } + } + + /** + * Waits until the results are populated and calls the callback if the call is successful. + * @return true for successful rpc and false otherwise. + */ + public boolean waitForResults() { + try { + // We do not set a timeout on await() because we rely on the underlying RPCs to timeout if + // something on the remote is broken. Worst case we should wait for rpc time out to kick in. + resultsReady.await(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for batched master RPC results. Aborting wait.", e); + } + Message message = result.get(); + if (message != null) { + callBack.run(message); + return true; + } + return false; + } + + public void addCallInFlight(Call c) { + callsInFlight.add(c); + } + + public void incrementFailedRpcs(IOException reason) { + if (failedRpcCount.incrementAndGet() == addresses.size()) { + lastFailedRpcReason = reason; + // All the rpcs in this batch have failed. Invoke the waiting threads. + resultsReady.countDown(); + } + } + + public IOException getLastFailedRpcReason() { + return lastFailedRpcReason; + } + + @Override + public String toString() { + return String.format("Batched rpc for target(s) %s", PrettyPrinter.toString(addresses)); + } + } + + public HedgedRpcChannel(AbstractRpcClient<?> rpcClient, List<InetSocketAddress> addrs, + User ticket, int rpcTimeout, int fanOutSize) { Review comment: Clarified. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services