bharathv commented on a change in pull request #2130: URL: https://github.com/apache/hbase/pull/2130#discussion_r464801901
########## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ########## @@ -2931,6 +2935,27 @@ public GetActiveMasterResponse getActiveMaster(RpcController rpcController, return resp.build(); } + @Override + public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequest request) + throws ServiceException { + GetMastersResponse.Builder resp = GetMastersResponse.newBuilder(); + // Active master + Optional<ServerName> serverName = master.getActiveMaster(); + serverName.ifPresent(name -> resp.addMasterServers(GetMastersResponseEntry.newBuilder() + .setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build())); + // Backup masters + try { + // TODO: Cache the backup masters to avoid a ZK RPC for each getMasters() call. Review comment: Right. ########## File path: hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java ########## @@ -121,6 +121,11 @@ public boolean hasCellBlockSupport() { @Override public void callMethod(MethodDescriptor method, RpcController controller, Message request, Message responsePrototype, RpcCallback<Message> done) { + if (!method.getName().equals("GetClusterId")) { + // Master registry internally runs other RPCs to keep the master list up to date. This check Review comment: Will add more detail. That is needed because of the way the test is written. This RpcChannel implementation intercepts all the mock RPCs from unit tests and the just counts the getClusterId calls (depending on the index).. With the patch a single GetClusterID() RPC failure can trigger an extra getMasters() call and that is accounted too. ########## File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java ########## @@ -115,20 +129,50 @@ MasterRegistry(Configuration conf) throws IOException { this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT)); - int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, + rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch // this through the master registry... // This is a problem as we will use the cluster id to determine the authentication method rpcClient = RpcClientFactory.createClient(conf, null); rpcControllerFactory = RpcControllerFactory.instantiate(conf); - Set<ServerName> masterAddrs = parseMasterAddrs(conf); + // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters + // by fetching the end points from this list. + populateMasterStubs(parseMasterAddrs(conf)); + Runnable masterEndPointRefresher = () -> { + while (!Thread.interrupted()) { + try { + // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't + // have duplicate refreshes because once the thread is past the wait(), notify()s are + // ignored until the thread is back to the waiting state. + synchronized (refreshMasters) { + refreshMasters.wait(WAIT_TIME_OUT_MS); + } + LOG.debug("Attempting to refresh master address end points."); + Set<ServerName> newMasters = new HashSet<>(getMasters().get()); + populateMasterStubs(newMasters); + LOG.debug("Finished refreshing master end points. {}", newMasters); + } catch (InterruptedException e) { + LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e); + break; + } catch (ExecutionException | IOException e) { + LOG.debug("Error populating latest list of masters.", e); + } + } + }; + masterAddrRefresherThread = Threads.newDaemonThreadFactory( + "MasterRegistry refresh end-points").newThread(masterEndPointRefresher); + masterAddrRefresherThread.start(); Review comment: Ok switched. I didn't want to have extra layers on top of a simple thread, but I guess a pool is more readable. ########## File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java ########## @@ -115,20 +129,50 @@ MasterRegistry(Configuration conf) throws IOException { this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT)); - int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, + rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch // this through the master registry... // This is a problem as we will use the cluster id to determine the authentication method rpcClient = RpcClientFactory.createClient(conf, null); rpcControllerFactory = RpcControllerFactory.instantiate(conf); - Set<ServerName> masterAddrs = parseMasterAddrs(conf); + // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters + // by fetching the end points from this list. + populateMasterStubs(parseMasterAddrs(conf)); + Runnable masterEndPointRefresher = () -> { + while (!Thread.interrupted()) { + try { + // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't + // have duplicate refreshes because once the thread is past the wait(), notify()s are + // ignored until the thread is back to the waiting state. + synchronized (refreshMasters) { + refreshMasters.wait(WAIT_TIME_OUT_MS); + } + LOG.debug("Attempting to refresh master address end points."); + Set<ServerName> newMasters = new HashSet<>(getMasters().get()); + populateMasterStubs(newMasters); + LOG.debug("Finished refreshing master end points. {}", newMasters); + } catch (InterruptedException e) { + LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e); + break; + } catch (ExecutionException | IOException e) { + LOG.debug("Error populating latest list of masters.", e); + } + } + }; + masterAddrRefresherThread = Threads.newDaemonThreadFactory( + "MasterRegistry refresh end-points").newThread(masterEndPointRefresher); + masterAddrRefresherThread.start(); Review comment: There is a start() in the c'tor? (Its hiding between the long comment chains). Anyway I switched to a single threaded pool per the suggestions. Hopefully its more clear. ########## File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java ########## @@ -115,20 +129,50 @@ MasterRegistry(Configuration conf) throws IOException { this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT)); - int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, + rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch // this through the master registry... // This is a problem as we will use the cluster id to determine the authentication method rpcClient = RpcClientFactory.createClient(conf, null); rpcControllerFactory = RpcControllerFactory.instantiate(conf); - Set<ServerName> masterAddrs = parseMasterAddrs(conf); + // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters + // by fetching the end points from this list. + populateMasterStubs(parseMasterAddrs(conf)); + Runnable masterEndPointRefresher = () -> { + while (!Thread.interrupted()) { + try { + // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't + // have duplicate refreshes because once the thread is past the wait(), notify()s are + // ignored until the thread is back to the waiting state. + synchronized (refreshMasters) { + refreshMasters.wait(WAIT_TIME_OUT_MS); + } + LOG.debug("Attempting to refresh master address end points."); + Set<ServerName> newMasters = new HashSet<>(getMasters().get()); + populateMasterStubs(newMasters); + LOG.debug("Finished refreshing master end points. {}", newMasters); + } catch (InterruptedException e) { + LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e); + break; + } catch (ExecutionException | IOException e) { + LOG.debug("Error populating latest list of masters.", e); + } + } + }; + masterAddrRefresherThread = Threads.newDaemonThreadFactory( + "MasterRegistry refresh end-points").newThread(masterEndPointRefresher); Review comment: Ok switched. I think we should use our internal one, but fine. ########## File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java ########## @@ -115,20 +129,50 @@ MasterRegistry(Configuration conf) throws IOException { this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT)); - int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, + rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch // this through the master registry... // This is a problem as we will use the cluster id to determine the authentication method rpcClient = RpcClientFactory.createClient(conf, null); rpcControllerFactory = RpcControllerFactory.instantiate(conf); - Set<ServerName> masterAddrs = parseMasterAddrs(conf); + // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters + // by fetching the end points from this list. + populateMasterStubs(parseMasterAddrs(conf)); + Runnable masterEndPointRefresher = () -> { + while (!Thread.interrupted()) { + try { + // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't + // have duplicate refreshes because once the thread is past the wait(), notify()s are + // ignored until the thread is back to the waiting state. + synchronized (refreshMasters) { + refreshMasters.wait(WAIT_TIME_OUT_MS); + } + LOG.debug("Attempting to refresh master address end points."); + Set<ServerName> newMasters = new HashSet<>(getMasters().get()); + populateMasterStubs(newMasters); + LOG.debug("Finished refreshing master end points. {}", newMasters); + } catch (InterruptedException e) { + LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e); + break; + } catch (ExecutionException | IOException e) { + LOG.debug("Error populating latest list of masters.", e); + } + } + }; + masterAddrRefresherThread = Threads.newDaemonThreadFactory( + "MasterRegistry refresh end-points").newThread(masterEndPointRefresher); Review comment: You mean between words? ########## File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java ########## @@ -170,6 +214,11 @@ public static String getMasterAddr(Configuration conf) throws UnknownHostExcepti callable.call(controller, stub, resp -> { if (controller.failed()) { future.completeExceptionally(controller.getFailed()); + // RPC has failed, trigger a refresh of master end points. We can have some spurious Review comment: My thinking was that since the RPC is relatively inexpensive and is not in any hot path, we could refresh it for any exception. One small optimization we could do is to do it only "connect" exceptions (timeouts/closed channel etc). > RPC failure results in more RPCs has me suspicious... Hmm. It's only on a best effort basis we try to keep the list up to date. If no master is reachable, I think we will be in a deeper trouble. I see your point though. You think we should have some kind of back-off on this thread if we are in that state where all the RPCs are failing and we have repeated getMasters() lookups? ########## File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java ########## @@ -89,11 +97,17 @@ private final int hedgedReadFanOut; // Configured list of masters to probe the meta information from. - private final ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub; + private volatile ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub; // RPC client used to talk to the masters. private final RpcClient rpcClient; private final RpcControllerFactory rpcControllerFactory; + private final int rpcTimeoutMs; + // For synchronizing on refreshing the master end-points + private final Object refreshMasters = new Object(); Review comment: No, it's loosely tied to a single connection (there are some places that create on the fly registries which is something that can be fixed). A single application can connect to multiple clusters which means we cannot make it static. ########## File path: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java ########## @@ -126,4 +131,46 @@ public void testRegistryRPCs() throws Exception { } } } + + /** + * Tests that the list of masters configured in the MasterRegistry is dynamically refreshed in the + * event of errors. + */ + @Test + public void testDynamicMasterConfigurationRefresh() throws Exception { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + String currentMasterAddrs = Preconditions.checkNotNull(conf.get(HConstants.MASTER_ADDRS_KEY)); + HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster(); + String clusterId = activeMaster.getClusterId(); + // Add a non-working master + ServerName badServer = ServerName.valueOf("localhost", 1234, -1); + conf.set(HConstants.MASTER_ADDRS_KEY, badServer.toShortString() + "," + currentMasterAddrs); + // Set the hedging fan out so that all masters are queried. + conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4); + try (MasterRegistry registry = new MasterRegistry(conf)) { + final Set<ServerName> masters = registry.getParsedMasterServers(); + assertTrue(masters.contains(badServer)); + // Make a registry RPC, this should trigger a refresh since one of the hedged RPC fails. + assertEquals(registry.getClusterId().get(), clusterId); + // Wait for new set of masters to be populated. + TEST_UTIL.waitFor(5000, + (Waiter.Predicate<Exception>) () -> !registry.getParsedMasterServers().equals(masters)); Review comment: Skipped it because it bloats up the code, we need to expand the lambda. I think the intent there is pretty clear if the test fails. Let me know if you feel strongly, I can change it. ########## File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java ########## @@ -115,20 +129,50 @@ MasterRegistry(Configuration conf) throws IOException { this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT)); - int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, + rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch // this through the master registry... // This is a problem as we will use the cluster id to determine the authentication method rpcClient = RpcClientFactory.createClient(conf, null); rpcControllerFactory = RpcControllerFactory.instantiate(conf); - Set<ServerName> masterAddrs = parseMasterAddrs(conf); + // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters + // by fetching the end points from this list. + populateMasterStubs(parseMasterAddrs(conf)); + Runnable masterEndPointRefresher = () -> { + while (!Thread.interrupted()) { + try { + // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't + // have duplicate refreshes because once the thread is past the wait(), notify()s are + // ignored until the thread is back to the waiting state. + synchronized (refreshMasters) { + refreshMasters.wait(WAIT_TIME_OUT_MS); + } + LOG.debug("Attempting to refresh master address end points."); + Set<ServerName> newMasters = new HashSet<>(getMasters().get()); + populateMasterStubs(newMasters); + LOG.debug("Finished refreshing master end points. {}", newMasters); + } catch (InterruptedException e) { + LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e); Review comment: Done. ########## File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java ########## @@ -115,20 +129,50 @@ MasterRegistry(Configuration conf) throws IOException { this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT)); - int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, + rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch // this through the master registry... // This is a problem as we will use the cluster id to determine the authentication method rpcClient = RpcClientFactory.createClient(conf, null); rpcControllerFactory = RpcControllerFactory.instantiate(conf); - Set<ServerName> masterAddrs = parseMasterAddrs(conf); + // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters + // by fetching the end points from this list. + populateMasterStubs(parseMasterAddrs(conf)); + Runnable masterEndPointRefresher = () -> { + while (!Thread.interrupted()) { + try { + // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't + // have duplicate refreshes because once the thread is past the wait(), notify()s are + // ignored until the thread is back to the waiting state. + synchronized (refreshMasters) { + refreshMasters.wait(WAIT_TIME_OUT_MS); + } + LOG.debug("Attempting to refresh master address end points."); + Set<ServerName> newMasters = new HashSet<>(getMasters().get()); + populateMasterStubs(newMasters); + LOG.debug("Finished refreshing master end points. {}", newMasters); + } catch (InterruptedException e) { + LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e); Review comment: > Should this interrupt be accompanied by a shutdown of the managing thread pool? This is InterruptedException thrown by wait() call. try-catch seems like the right way to do it? (unless I misunderstood what you are suggesting). ########## File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java ########## @@ -115,20 +129,50 @@ MasterRegistry(Configuration conf) throws IOException { this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT)); - int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, + rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch // this through the master registry... // This is a problem as we will use the cluster id to determine the authentication method rpcClient = RpcClientFactory.createClient(conf, null); rpcControllerFactory = RpcControllerFactory.instantiate(conf); - Set<ServerName> masterAddrs = parseMasterAddrs(conf); + // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters + // by fetching the end points from this list. + populateMasterStubs(parseMasterAddrs(conf)); + Runnable masterEndPointRefresher = () -> { + while (!Thread.interrupted()) { + try { + // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't + // have duplicate refreshes because once the thread is past the wait(), notify()s are + // ignored until the thread is back to the waiting state. + synchronized (refreshMasters) { + refreshMasters.wait(WAIT_TIME_OUT_MS); + } + LOG.debug("Attempting to refresh master address end points."); + Set<ServerName> newMasters = new HashSet<>(getMasters().get()); Review comment: I haven't thought about this. Are you talking about this RPC specifically? If so, may I ask why? Adding it on the server makes sense to me, like we want some metrics around where most of the time is spent (grouped by RPC) but curious what purpose it serves on client. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org