apurtell commented on a change in pull request #954: HBASE-23305: Master based 
registry implementation
URL: https://github.com/apache/hbase/pull/954#discussion_r360541305
 
 

 ##########
 File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
 ##########
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
+import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_DEFAULT;
+import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
+import static 
org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_NUM_HEDGED_REQS_DEFAULT;
+import static 
org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_NUM_HEDGED_REQS_KEY;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
+
+/**
+ * Master based registry implementation. Makes RPCs to the configured master 
addresses from config
+ * {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
+ *
+ * It has the ability to burst the same RPC to multiple masters as a batch and 
returns whatever
+ * comes back first (a.k.a hedged RPCs). Number of target masters in a single 
batch is controlled
+ * via {@value 
org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_NUM_HEDGED_REQS_KEY}. If it 
is
+ * set to 1 (default), it is equivalent to picking a random master from the 
configured list.
+ *
+ * TODO: Handle changes to the configuration dynamically without having to 
restart the client.
+ */
+@InterfaceAudience.Private
+public class MasterRegistry implements AsyncRegistry {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MasterRegistry.class);
+
+  // Configured list of masters to probe the meta information from.
+  private final List<ServerName> masterServers;
+  // Controls the fan out of the hedged requests. Requests are made in batches 
of this number until
+  // all the servers are exhausted. The first returned result is passed back 
to the client.
+  private final int requestFanOut;
+  private ExecutorService masterRpcPool;
+
+  // RPC client used to talk to the masters.
+  private final RpcClient rpcClient;
+  private final RpcControllerFactory rpcControllerFactory;
+  private final int rpcTimeoutNs;
+
+  // A simple interface that callers can implement to make an RPC to master. 
This is used to
+  // abstract out the logic needed to hedge the requests to multiple masters. 
For more details, look
+  // at doRPCs().
+  @VisibleForTesting
+  @FunctionalInterface
+  public interface RpcCall<RESP> {
+    RESP doRpc(ClientMetaService.BlockingInterface stub) throws 
ServiceException;
+  }
+
+  /**
+   * A shared RPC context between a batch of hedged RPCs. Tracks the state and 
helpers needed to
+   * synchronize on multiple RPCs to different masters fetching the result. 
All the methods are
+   * thread-safe.
+   * @param <RESP> Return response type for the RPCs.
+   */
+  private class BatchRpcCtx<RESP> {
+    // Result set by the thread finishing first. Set only once.
+    private AtomicReference<RESP> result;
+    // Caller waits on this latch being set.
+    private CountDownLatch resultsReady;
+    // Book-keeping for number of failed RPCs.
+    private AtomicInteger failedRPCs;
+
+    BatchRpcCtx() {
+      result = new AtomicReference<>();
+      // We set this to 1, so that the first successful RPC result is returned 
to the client.
+      resultsReady = new CountDownLatch(1);
+      failedRPCs = new AtomicInteger(0);
+    }
+
+    /**
+     * Sets the result only if it is not already set by another thread. Thread 
that successfully
+     * sets the result also count downs the latch.
+     * @param result Result to be set.
+     */
+    public void setResultIfNotSet(RESP result) {
+      if (this.result.compareAndSet(null, result)) {
+        resultsReady.countDown();
+      }
+    }
+
+    /**
+     * Caller can use this method to wait for results to be fetched.
+     * @param timeoutNs Waits until this timeout hits or the results are set. 
Whatever happens
+     *                  first.
+     * @return True if the results are ready. False otherwise.
+     */
+    public boolean waitForResults(int timeoutNs) {
+      try {
+        return resultsReady.await(timeoutNs, TimeUnit.NANOSECONDS);
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for batched master RPC results. 
Aborting wait.", e);
+        return false;
+      }
+    }
+
+    /**
+     * Helper to increment the number of failed RPCs.
+     */
+    public void incrementFailedRPCs() {
+      failedRPCs.incrementAndGet();
+    }
+
+    /**
+     * Onus is on the caller to wait for the results and call this.
+     * @return the current result.
+     */
+    public RESP getResult() {
+      return result.get();
+    }
+  }
+
+  /**
+   * A runnable implementation of an RPC call to a given master. Updates the 
results in a shared
+   * rpc context.
+   * @param <RESP> Response type of the RPC.
+   */
+  private class MasterRpc<RESP> implements Runnable {
+    private final BatchRpcCtx<RESP> rpcCtx;
+    private final ServerName master;
+    private final RpcCall<RESP> rpcCall;
+    private final Function<RESP, Boolean> isValidResp;
+    private final String debugStr;
+
+    MasterRpc(BatchRpcCtx<RESP> rpcCtx, ServerName master, RpcCall<RESP> 
rpcCall,
+       Function<RESP, Boolean> isValidResp, String debugStr) {
+      this.rpcCtx = rpcCtx;
+      this.master = master;
+      this.rpcCall = rpcCall;
+      this.isValidResp = isValidResp;
+      this.debugStr = debugStr;
+    }
+
+    @Override
+    public void run() {
+      try {
+        RESP resp = rpcCall.doRpc(getMasterStub(master));
+        if (isValidResp.apply(resp)) {
+          // Valid result, set if not set by other threads.
+          rpcCtx.setResultIfNotSet(resp);
+          return;
+        }
+      } catch (Exception e) {
+        LOG.warn("Error calling {} on master {}. Trying other masters.", 
debugStr, master, e);
+      }
+      rpcCtx.incrementFailedRPCs();
+    }
+  }
+
+  MasterRegistry(Configuration conf) {
+    masterServers = new ArrayList<>();
+    requestFanOut =
+        conf.getInt(MASTER_REGISTRY_NUM_HEDGED_REQS_KEY, 
MASTER_REGISTRY_NUM_HEDGED_REQS_DEFAULT);
+    Preconditions.checkArgument(requestFanOut >= 1);
+    if (requestFanOut > 1) {
+      masterRpcPool = Executors.newFixedThreadPool(requestFanOut,
+          new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("MasterRegistryRPC-%d").build());
+    }
+    parseMasterAddrs(conf);
+    rpcTimeoutNs = (int) Math.min(Integer.MAX_VALUE,
+        TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY,
+        DEFAULT_HBASE_RPC_TIMEOUT)));
+    // TODO(HBASE-23330): Fix clients using cluster ID based token auth.
+    rpcClient = RpcClientFactory.createClient(conf, 
HConstants.CLUSTER_ID_DEFAULT);
+    rpcControllerFactory = RpcControllerFactory.instantiate(conf);
+  }
+
+  /**
+   * Parses the list of master addresses from the provided configuration.
+   * @param conf Configuration to parse from.
+   */
+  private void parseMasterAddrs(Configuration conf) {
+    String configuredMasters = conf.get(MASTER_ADDRS_KEY, 
MASTER_ADDRS_DEFAULT);
+    for (String masterAddr: configuredMasters.split(",")) {
+      masterServers.add(ServerName.valueOf(masterAddr, 
ServerName.NON_STARTCODE));
+    }
+    // (Pseudo) Randomized so that not all clients hot spot the same set of 
masters.
+    Collections.shuffle(masterServers);
+    Preconditions.checkArgument(!masterServers.isEmpty());
+  }
+
+  /**
+   * Makes a given RPC to master servers.
+   * @param rpcCall Call to make.
+   * @param debug String used for debug logging the RPC details.
+   * @param <RESP> Response type for the RPC.
+   * @param isvalidResp Used to verify if the response returned from RPC is 
valid.
+   * @return Optional response from the RPCs to parsed masters.
+   */
+  @VisibleForTesting
+  <RESP> Optional<RESP> doRPCs(RpcCall<RESP> rpcCall,
+      Function<RESP, Boolean> isvalidResp, String debug) {
+    if (requestFanOut == 1) {
 
 Review comment:
   I was expecting for the first cut we'd do just one request to a random host 
on the list at a time, and retry with another random choice. (So above 
randomization of list is good and important.) This is what the zookeeper client 
does now so is no different from current state of play. 
   
   Hedged reading is ahead of the game.
   
   Good that it is off by default, though.
   
   Also, it's nice that fan out factor is configurable, but I would want an 
adaptive policy by default. Only if single requests are failing at some 
threshold of unacceptable probability (i suppose controlled by a config param) 
would you want to start loading up more than one per request in trade for 
faster response, hopefully, on average.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to