bharathv commented on a change in pull request #954: HBASE-23305: Master based 
registry implementation
URL: https://github.com/apache/hbase/pull/954#discussion_r362695859
 
 

 ##########
 File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java
 ##########
 @@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.PrettyPrinter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+/**
+ * A non-blocking implementation of RpcChannel that hedges requests to 
multiple service end points.
+ * First received response is returned to the caller. This abstracts out the 
logic needed to batch
+ * requests to multiple end points underneath and presents itself as a single 
logical RpcChannel to
+ * the client.
+ *
+ * Hedging Details:
+ * ---------------
+ * - Hedging of RPCs happens in multiple batches. In each iteration, we select 
a 'batch' of address
+ * end points to make the call to. We do multiple iterations until we get a 
proper response to the
+ * rpc call or all the service addresses are exhausted, which ever happens 
first.
+ *
+ * - We randomize the addresses up front so that the batch order per client is 
non deterministic.
+ * This avoids hot spots on the service side. The size of each batch is 
controlled via 'fanOutSize'.
+ * Higher fanOutSize implies we make more rpc calls in a single batch. One 
needs to mindful of the
+ * load on the client and server side when configuring the fan out.
+ *
+ * - In a happy case, once we receive a response from one end point, we cancel 
all the
+ * other inflight rpcs in the same batch and return the response to the 
caller. If we do not get a
+ * valid response from any address end point, we propagate the error back to 
the caller.
+ *
+ * - Rpc timeouts are applied to every hedged rpc.
+ *
+ * - Callers need to be careful about what rpcs they are trying to hedge. Not 
every kind of call can
+ * be hedged (for example: cluster state changing rpcs).
+ *
+ * (TODO) Retries and Adaptive hedging policy:
+ * ------------------------------------------
+ *
+ * - No retries are handled at the channel level. Retries can be built in 
upper layers. However the
+ * question is, do we even need retries? Hedging in fact is a substitute for 
retries.
+ *
+ * - Clearly hedging puts more load on the service side. To mitigate this, we 
can make the hedging
+ * policy more adaptive. In most happy cases, the rpcs from the first few end 
points should return
+ * right away (especially short lived rpcs, that do not take up much time). In 
such cases, hedging
+ * is not needed. So, the idea is to make this request pattern pluggable so 
that the requests are
+ * hedged only when needed.
+ */
+class HedgedRpcChannel implements RpcChannel {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HedgedRpcChannel.class);
+
+  private final AbstractRpcClient rpcClient;
+  // List of service addresses to hedge the requests to.
+  private final List<InetSocketAddress> addrs;
+  private final User ticket;
+  private final int rpcTimeout;
+  // Controls the size of request fan out (number of rpcs per a single batch).
+  private final int fanOutSize;
+
+  /**
+   * A simple rpc call back implementation to notify the batch context if any 
rpc is successful.
+   */
+  private static class BatchRpcCtxCallBack implements RpcCallback<Message> {
+    private  final BatchRpcCtx batchRpcCtx;
+    private final HBaseRpcController rpcController;
+    BatchRpcCtxCallBack(BatchRpcCtx batchRpcCtx, HBaseRpcController 
rpcController) {
+      this.batchRpcCtx = batchRpcCtx;
+      this.rpcController = rpcController;
+    }
+    @Override
+    public void run(Message result) {
+      batchRpcCtx.setResultIfNotSet(result, rpcController);
+    }
+  }
+
+  /**
+   * A shared RPC context between a batch of hedged RPCs. Tracks the state and 
helpers needed to
+   * synchronize on multiple RPCs to different end points fetching the result. 
All the methods are
+   * thread-safe.
+   */
+  private static class BatchRpcCtx {
+    // Result set by the thread finishing first. Set only once.
+    private final AtomicReference<Message> result = new AtomicReference<>();
+    // Caller waits on this latch being set.
+    // We set this to 1, so that the first successful RPC result is returned 
to the client.
+    private CountDownLatch resultsReady = new CountDownLatch(1);
+    // Failed rpc book-keeping.
+    private AtomicInteger failedRpcCount = new AtomicInteger();
+    // All the call handles for this batch.
+    private final List<Call> callsInFlight = Collections.synchronizedList(new 
ArrayList<>());
+
+    // Target addresses.
+    private final List<InetSocketAddress> addresses;
+    // Called when the result is ready.
+    private final RpcCallback<Message> callBack;
+    // Last failed rpc's exception. Used to propagate the reason to the 
controller.
+    private IOException lastFailedRpcReason;
+
+
+    BatchRpcCtx(List<InetSocketAddress> addresses, RpcCallback<Message> 
callBack) {
+      this.addresses = addresses;
+      this.callBack = Preconditions.checkNotNull(callBack);
+    }
+
+    /**
+     * Sets the result only if it is not already set by another thread. Thread 
that successfully
+     * sets the result also count downs the latch.
+     * @param result Result to be set.
+     */
+    public void setResultIfNotSet(Message result, HBaseRpcController 
rpcController) {
+      if (result == null) {
+        incrementFailedRpcs(rpcController.getFailed());
+        return;
+      }
+      if (this.result.compareAndSet(null, result)) {
+        resultsReady.countDown();
+        // Cancel all pending in flight calls.
+        for (Call call: callsInFlight) {
+          // It is ok to do it for all calls as it is a no-op if the call is 
already done.
+          call.setException(new CallCancelledException("Hedged call 
succeeded."));
+        }
+      }
+    }
+
+    /**
+     * Waits until the results are populated and calls the callback if the 
call is successful.
+     * @return true for successful rpc and false otherwise.
+     */
+    public boolean waitForResults() {
+      try {
+        // We do not set a timeout on await() because we rely on the 
underlying RPCs to timeout if
+        // something on the remote is broken. Worst case we should wait for 
rpc time out to kick in.
+        resultsReady.await();
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for batched master RPC results. 
Aborting wait.", e);
+      }
+      Message message = result.get();
+      if (message != null) {
+        callBack.run(message);
+        return true;
+      }
+      return false;
+    }
+
+    public void addCallInFlight(Call c) {
+      callsInFlight.add(c);
+    }
+
+    public void incrementFailedRpcs(IOException reason) {
+      if (failedRpcCount.incrementAndGet() == addresses.size()) {
+        lastFailedRpcReason = reason;
+        // All the rpcs in this batch have failed. Invoke the waiting threads.
+        resultsReady.countDown();
+      }
+    }
+
+    public IOException getLastFailedRpcReason() {
+      return lastFailedRpcReason;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("Batched rpc for target(s) %s", 
PrettyPrinter.toString(addresses));
+    }
+  }
+
+  public HedgedRpcChannel(AbstractRpcClient<?> rpcClient, 
List<InetSocketAddress> addrs,
+      User ticket, int rpcTimeout, int fanOutSize) {
+    this.rpcClient = rpcClient;
+    this.addrs = Preconditions.checkNotNull(addrs);
+    Preconditions.checkArgument(this.addrs.size() >= 1);
+    // For non-deterministic client query pattern. Not all clients want to 
hedge RPCs in the same
+    // order, creating hot spots on the service end points.
+    Collections.shuffle(this.addrs);
+    this.ticket = ticket;
+    this.rpcTimeout = rpcTimeout;
+    this.fanOutSize = fanOutSize;
+  }
+
+  private HBaseRpcController applyRpcTimeout(RpcController controller) {
+    // There is no reason to use any other implementation of RpcController.
+    Preconditions.checkState(controller instanceof HBaseRpcController);
+    HBaseRpcController hBaseRpcController = (HBaseRpcController) controller;
+    int rpcTimeoutToSet =
+        hBaseRpcController.hasCallTimeout() ? 
hBaseRpcController.getCallTimeout() : rpcTimeout;
+    HBaseRpcController response = new HBaseRpcControllerImpl();
+    response.setCallTimeout(rpcTimeoutToSet);
+    return response;
+  }
+
+  public void doCallMethod(Descriptors.MethodDescriptor method, RpcController 
controller,
+      Message request, Message responsePrototype, RpcCallback<Message> done) {
+    int i = 0;
+    BatchRpcCtx lastBatchCtx = null;
+    while (i < addrs.size()) {
+      // Each iteration picks fanOutSize addresses to run as batch.
+      int batchEnd = Math.min(addrs.size(), i + fanOutSize);
+      List<InetSocketAddress> addrSubList = addrs.subList(i, batchEnd);
+      BatchRpcCtx batchRpcCtx = new BatchRpcCtx(addrSubList, done);
+      lastBatchCtx = batchRpcCtx;
+      LOG.debug("Attempting request {}, {}", method.getName(), batchRpcCtx);
+      for (InetSocketAddress address : addrSubList) {
+        HBaseRpcController rpcController = applyRpcTimeout(controller);
+        // ** WARN ** This is a blocking call if the underlying connection for 
the rpc client is
+        // a blocking implementation (ex: BlockingRpcConnection). That 
essentially serializes all
+        // the write calls. Handling blocking connection means that this 
should be run in a separate
+        // thread and hence more code complexity. Is it ok to handle only 
non-blocking connections?
+        // Should we have a check in the constructor if the underlying 
connection is a blocking
+        // impl and then log some warning?
+        batchRpcCtx.addCallInFlight(rpcClient.callMethod(method, 
rpcController, request,
+            responsePrototype, ticket, address,
+            new BatchRpcCtxCallBack(batchRpcCtx, rpcController)));
+      }
+      if (batchRpcCtx.waitForResults()) {
+        return;
+      }
+      // TODO: Sleep between batches?
+      // Entire batch has failed, lets try the next batch.
+      LOG.debug("Failed request {}, {}.", method.getName(), batchRpcCtx);
+      i = batchEnd;
+    }
+    // All the batches failed, mark it a failed rpc.
+    // Propagate the failure reason. We propagate the last batch's last 
failing rpc reason.
+    // Can we do something better?
+    ((HBaseRpcController) 
controller).setFailed(lastBatchCtx.getLastFailedRpcReason());
 
 Review comment:
   No, there is a Preconditions check in the c'tor after address parsing. Still 
added the null check.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to