HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/28c7315e Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/28c7315e Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/28c7315e Branch: refs/heads/branch-1.4 Commit: 28c7315e0b1db7e7fd30ba996b8735ac2d805756 Parents: 7a9e1dd Author: Andrew Purtell <apurt...@apache.org> Authored: Fri Nov 3 15:03:08 2017 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Fri Nov 3 15:08:46 2017 -0700 ---------------------------------------------------------------------- .../hbase/rsgroup/RSGroupInfoManager.java | 4 + .../hbase/rsgroup/RSGroupInfoManagerImpl.java | 90 ++++++++++++++++++-- 2 files changed, 89 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/28c7315e/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java index ab423e9..2330605 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java @@ -36,6 +36,10 @@ import org.apache.hadoop.hbase.net.Address; */ @InterfaceAudience.Private public interface RSGroupInfoManager { + + String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait"; + long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L; + //Assigned before user tables TableName RSGROUP_TABLE_NAME = TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup"); http://git-wip-us.apache.org/repos/asf/hbase/blob/28c7315e/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java index 80eaefb..cfaa632 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.ServerListener; import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; @@ -81,6 +82,7 @@ import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; import org.apache.hadoop.hbase.security.access.AccessControlLists; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -119,6 +121,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene private volatile Set<String> prevRSGroups; private RSGroupSerDe rsGroupSerDe; private DefaultServerUpdater defaultServerUpdater; + private FailedOpenUpdater failedOpenUpdater; private boolean isInit = false; public RSGroupInfoManagerImpl(MasterServices master) throws IOException { @@ -136,8 +139,10 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene refresh(); rsGroupStartupWorker.start(); defaultServerUpdater = new DefaultServerUpdater(this); + Threads.setDaemonThreadRunning(defaultServerUpdater); + failedOpenUpdater = new FailedOpenUpdater(this); + Threads.setDaemonThreadRunning(failedOpenUpdater); master.getServerManager().registerListener(this); - defaultServerUpdater.start(); isInit = true; } @@ -493,6 +498,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene @Override public void serverAdded(ServerName serverName) { defaultServerUpdater.serverChanged(); + failedOpenUpdater.serverChanged(); } @Override @@ -503,18 +509,22 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene private static class DefaultServerUpdater extends Thread { private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class); private RSGroupInfoManagerImpl mgr; - private boolean hasChanged = false; + private volatile boolean hasChanged = false; public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) { this.mgr = mgr; + setName(DefaultServerUpdater.class.getName()+"-" + mgr.master.getServerName()); + setDaemon(true); } @Override public void run() { List<Address> prevDefaultServers = new LinkedList<Address>(); - while(!mgr.master.isAborted() || !mgr.master.isStopped()) { + while (!mgr.master.isAborted() && !mgr.master.isStopped()) { try { - LOG.info("Updating default servers."); + if (LOG.isDebugEnabled()) { + LOG.debug("Updating default servers"); + } List<Address> servers = mgr.getDefaultServers(); Collections.sort(servers, new Comparator<Address>() { @Override @@ -533,12 +543,13 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene } try { synchronized (this) { - if(!hasChanged) { + while (!hasChanged) { wait(); } hasChanged = false; } } catch (InterruptedException e) { + LOG.warn("Interrupted", e); } } catch (IOException e) { LOG.warn("Failed to update default servers", e); @@ -546,6 +557,75 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene } } + // Called for both server additions and removals + public void serverChanged() { + synchronized (this) { + hasChanged = true; + this.notify(); + } + } + } + + private static class FailedOpenUpdater extends Thread { + private static final Log LOG = LogFactory.getLog(FailedOpenUpdater.class); + + private final RSGroupInfoManagerImpl mgr; + private final long waitInterval; + private volatile boolean hasChanged = false; + + public FailedOpenUpdater(RSGroupInfoManagerImpl mgr) { + this.mgr = mgr; + this.waitInterval = mgr.master.getConfiguration().getLong(REASSIGN_WAIT_INTERVAL_KEY, + DEFAULT_REASSIGN_WAIT_INTERVAL); + setName(FailedOpenUpdater.class.getName()+"-" + mgr.master.getServerName()); + setDaemon(true); + } + + @Override + public void run() { + while (!mgr.master.isAborted() && !mgr.master.isStopped()) { + boolean interrupted = false; + try { + synchronized (this) { + while (!hasChanged) { + wait(); + } + hasChanged = false; + } + } catch (InterruptedException e) { + LOG.warn("Interrupted", e); + interrupted = true; + } + if (mgr.master.isAborted() || mgr.master.isStopped() || interrupted) { + continue; + } + + // First, wait a while in case more servers are about to rejoin the cluster + try { + Thread.sleep(waitInterval); + } catch (InterruptedException e) { + LOG.warn("Interrupted", e); + } + if (mgr.master.isAborted() || mgr.master.isStopped()) { + continue; + } + + // Kick all regions in FAILED_OPEN state + List<HRegionInfo> failedAssignments = Lists.newArrayList(); + for (RegionState state: + mgr.master.getAssignmentManager().getRegionStates().getRegionsInTransition()) { + if (state.isFailedOpen()) { + failedAssignments.add(state.getRegion()); + } + } + for (HRegionInfo region: failedAssignments) { + LOG.info("Retrying assignment of " + region); + mgr.master.getAssignmentManager().unassign(region); + } + } + } + + // Only called for server additions public void serverChanged() { synchronized (this) { hasChanged = true;