bharathv commented on a change in pull request #2130:
URL: https://github.com/apache/hbase/pull/2130#discussion_r464801901



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
##########
@@ -2931,6 +2935,27 @@ public GetActiveMasterResponse 
getActiveMaster(RpcController rpcController,
     return resp.build();
   }
 
+  @Override
+  public GetMastersResponse getMasters(RpcController rpcController, 
GetMastersRequest request)
+      throws ServiceException {
+    GetMastersResponse.Builder resp = GetMastersResponse.newBuilder();
+    // Active master
+    Optional<ServerName> serverName = master.getActiveMaster();
+    serverName.ifPresent(name -> 
resp.addMasterServers(GetMastersResponseEntry.newBuilder()
+        
.setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
+    // Backup masters
+    try {
+      // TODO: Cache the backup masters to avoid a ZK RPC for each 
getMasters() call.

Review comment:
       Right.

##########
File path: 
hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java
##########
@@ -121,6 +121,11 @@ public boolean hasCellBlockSupport() {
     @Override
     public void callMethod(MethodDescriptor method, RpcController controller, 
Message request,
       Message responsePrototype, RpcCallback<Message> done) {
+      if (!method.getName().equals("GetClusterId")) {
+        // Master registry internally runs other RPCs to keep the master list 
up to date. This check

Review comment:
       Will add more detail. That is needed because of the way the test is 
written. This RpcChannel implementation intercepts all the mock RPCs from unit 
tests and the just counts the getClusterId calls (depending on the index).. 
With the patch a single GetClusterID() RPC failure can trigger an extra 
getMasters() call and that is accounted too.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting 
refresh-masters-thread.", e);
+          break;
+        } catch (ExecutionException | IOException e) {
+          LOG.debug("Error populating latest list of masters.", e);
+        }
+      }
+    };
+    masterAddrRefresherThread = Threads.newDaemonThreadFactory(
+        "MasterRegistry refresh 
end-points").newThread(masterEndPointRefresher);
+    masterAddrRefresherThread.start();

Review comment:
       Ok switched. I didn't want to have extra layers on top of a simple 
thread, but I guess a pool is more readable.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting 
refresh-masters-thread.", e);
+          break;
+        } catch (ExecutionException | IOException e) {
+          LOG.debug("Error populating latest list of masters.", e);
+        }
+      }
+    };
+    masterAddrRefresherThread = Threads.newDaemonThreadFactory(
+        "MasterRegistry refresh 
end-points").newThread(masterEndPointRefresher);
+    masterAddrRefresherThread.start();

Review comment:
       There is a start() in the c'tor? (Its hiding between the long comment 
chains). Anyway I switched to a single threaded pool per the suggestions. 
Hopefully its more clear.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting 
refresh-masters-thread.", e);
+          break;
+        } catch (ExecutionException | IOException e) {
+          LOG.debug("Error populating latest list of masters.", e);
+        }
+      }
+    };
+    masterAddrRefresherThread = Threads.newDaemonThreadFactory(
+        "MasterRegistry refresh 
end-points").newThread(masterEndPointRefresher);

Review comment:
       Ok switched. I think we should use our internal one, but fine.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting 
refresh-masters-thread.", e);
+          break;
+        } catch (ExecutionException | IOException e) {
+          LOG.debug("Error populating latest list of masters.", e);
+        }
+      }
+    };
+    masterAddrRefresherThread = Threads.newDaemonThreadFactory(
+        "MasterRegistry refresh 
end-points").newThread(masterEndPointRefresher);

Review comment:
       You mean between words?

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -170,6 +214,11 @@ public static String getMasterAddr(Configuration conf) 
throws UnknownHostExcepti
     callable.call(controller, stub, resp -> {
       if (controller.failed()) {
         future.completeExceptionally(controller.getFailed());
+        // RPC has failed, trigger a refresh of master end points. We can have 
some spurious

Review comment:
       My thinking was that since the RPC is relatively inexpensive and is not 
in any hot path, we could refresh it for any exception. One small optimization 
we could do is to do it only "connect" exceptions (timeouts/closed channel etc).
   
   > RPC failure results in more RPCs has me suspicious...
   
   Hmm. It's only on a best effort basis we try to keep the list up to date. If 
no master is reachable, I think we will be in a deeper trouble. I see your 
point though. You think we should have some kind of back-off on this thread if 
we are in that state where all the RPCs are failing and we have repeated 
getMasters() lookups?

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -89,11 +97,17 @@
   private final int hedgedReadFanOut;
 
   // Configured list of masters to probe the meta information from.
-  private final ImmutableMap<ServerName, ClientMetaService.Interface> 
masterAddr2Stub;
+  private volatile ImmutableMap<ServerName, ClientMetaService.Interface> 
masterAddr2Stub;
 
   // RPC client used to talk to the masters.
   private final RpcClient rpcClient;
   private final RpcControllerFactory rpcControllerFactory;
+  private final int rpcTimeoutMs;
+  // For synchronizing on refreshing the master end-points
+  private final Object refreshMasters = new Object();

Review comment:
       No, it's loosely tied to a single connection (there are some places that 
create on the fly registries which is something that can be fixed). 
   
   A single application can connect to multiple clusters which means we cannot 
make it static.

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
##########
@@ -126,4 +131,46 @@ public void testRegistryRPCs() throws Exception {
       }
     }
   }
+
+  /**
+   * Tests that the list of masters configured in the MasterRegistry is 
dynamically refreshed in the
+   * event of errors.
+   */
+  @Test
+  public void testDynamicMasterConfigurationRefresh() throws Exception {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    String currentMasterAddrs = 
Preconditions.checkNotNull(conf.get(HConstants.MASTER_ADDRS_KEY));
+    HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
+    String clusterId = activeMaster.getClusterId();
+    // Add a non-working master
+    ServerName badServer = ServerName.valueOf("localhost", 1234, -1);
+    conf.set(HConstants.MASTER_ADDRS_KEY, badServer.toShortString() + "," + 
currentMasterAddrs);
+    // Set the hedging fan out so that all masters are queried.
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
+    try (MasterRegistry registry = new MasterRegistry(conf)) {
+      final Set<ServerName> masters = registry.getParsedMasterServers();
+      assertTrue(masters.contains(badServer));
+      // Make a registry RPC, this should trigger a refresh since one of the 
hedged RPC fails.
+      assertEquals(registry.getClusterId().get(), clusterId);
+      // Wait for new set of masters to be populated.
+      TEST_UTIL.waitFor(5000,
+          (Waiter.Predicate<Exception>) () -> 
!registry.getParsedMasterServers().equals(masters));

Review comment:
       Skipped it because it bloats up the code, we need to expand the lambda. 
I think the intent there is pretty clear if the  test fails. Let me know if you 
feel strongly, I can change it.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting 
refresh-masters-thread.", e);

Review comment:
       Done.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting 
refresh-masters-thread.", e);

Review comment:
       > Should this interrupt be accompanied by a shutdown of the managing 
thread pool?
   
   This is InterruptedException thrown by wait() call. try-catch seems like the 
right way to do it? (unless I misunderstood what you are suggesting).

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());

Review comment:
       I haven't thought about this. Are you talking about this RPC 
specifically? If so, may I ask why? 
   
   Adding it on the server makes sense to me, like we want some metrics around 
where most of the time is spent (grouped by RPC) but curious what purpose it 
serves on client. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to