http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
new file mode 100644
index 0000000..3a2a6d7
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.MultiHConnection;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A helper to persist region state in meta. We may change this class
+ * to StateStore later if we also use it to store other states in meta
+ */
+@InterfaceAudience.Private
+public class RegionStateStore {
+  private static final Log LOG = LogFactory.getLog(RegionStateStore.class);
+
+  /** The delimiter for meta columns for replicaIds > 0 */
+  protected static final char META_REPLICA_ID_DELIMITER = '_';
+
+  private volatile Region metaRegion;
+  private volatile boolean initialized;
+  private MultiHConnection multiHConnection;
+  private final MasterServices server;
+
+  /**
+   * Returns the {@link ServerName} from catalog table {@link Result}
+   * where the region is transitioning. It should be the same as
+   * {@link MetaTableAccessor#getServerName(Result,int)} if the server is at 
OPEN state.
+   * @param r Result to pull the transitioning server name from
+   * @return A ServerName instance or {@link 
MetaTableAccessor#getServerName(Result,int)}
+   * if necessary fields not found or empty.
+   */
+  static ServerName getRegionServer(final Result r, int replicaId) {
+    Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, 
getServerNameColumn(replicaId));
+    if (cell == null || cell.getValueLength() == 0) {
+      RegionLocations locations = MetaTableAccessor.getRegionLocations(r);
+      if (locations != null) {
+        HRegionLocation location = locations.getRegionLocation(replicaId);
+        if (location != null) {
+          return location.getServerName();
+        }
+      }
+      return null;
+    }
+    return ServerName.parseServerName(Bytes.toString(cell.getValueArray(),
+      cell.getValueOffset(), cell.getValueLength()));
+  }
+
+  private static byte[] getServerNameColumn(int replicaId) {
+    return replicaId == 0
+        ? HConstants.SERVERNAME_QUALIFIER
+        : Bytes.toBytes(HConstants.SERVERNAME_QUALIFIER_STR + 
META_REPLICA_ID_DELIMITER
+          + String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId));
+  }
+
+  /**
+   * Pull the region state from a catalog table {@link Result}.
+   * @param r Result to pull the region state from
+   * @return the region state, or OPEN if there's no value written.
+   */
+  static State getRegionState(final Result r, int replicaId) {
+    Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, 
getStateColumn(replicaId));
+    if (cell == null || cell.getValueLength() == 0) return State.OPEN;
+    return State.valueOf(Bytes.toString(cell.getValueArray(),
+      cell.getValueOffset(), cell.getValueLength()));
+  }
+
+  private static byte[] getStateColumn(int replicaId) {
+    return replicaId == 0
+        ? HConstants.STATE_QUALIFIER
+        : Bytes.toBytes(HConstants.STATE_QUALIFIER_STR + 
META_REPLICA_ID_DELIMITER
+          + String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId));
+  }
+
+  /**
+   * Check if we should persist a state change in meta. Generally it's
+   * better to persist all state changes. However, we should not do that
+   * if the region is not in meta at all. Based on the state and the
+   * previous state, we can identify if a user region has an entry
+   * in meta. For example, merged regions are deleted from meta;
+   * New merging parents, or splitting daughters are
+   * not created in meta yet.
+   */
+  private boolean shouldPersistStateChange(
+      HRegionInfo hri, RegionState state, RegionState oldState) {
+    return !hri.isMetaRegion() && !RegionStates.isOneOfStates(
+      state, State.MERGING_NEW, State.SPLITTING_NEW, State.MERGED)
+      && !(RegionStates.isOneOfStates(state, State.OFFLINE)
+        && RegionStates.isOneOfStates(oldState, State.MERGING_NEW,
+          State.SPLITTING_NEW, State.MERGED));
+  }
+
+  RegionStateStore(final MasterServices server) {
+    this.server = server;
+    initialized = false;
+  }
+
+  void start() throws IOException {
+    if (server instanceof RegionServerServices) {
+      metaRegion = ((RegionServerServices)server).getFromOnlineRegions(
+        HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
+    }
+    // When meta is not colocated on master
+    if (metaRegion == null) {
+      Configuration conf = server.getConfiguration();
+      // Config to determine the no of HConnections to META.
+      // A single Connection should be sufficient in most cases. Only if
+      // you are doing lot of writes (>1M) to META,
+      // increasing this value might improve the write throughput.
+      multiHConnection =
+          new MultiHConnection(conf, 
conf.getInt("hbase.regionstatestore.meta.connection", 1));
+    }
+    initialized = true;
+  }
+
+  void stop() {
+    initialized = false;
+    if (multiHConnection != null) {
+      multiHConnection.close();
+    }
+  }
+
+  void updateRegionState(long openSeqNum,
+      RegionState newState, RegionState oldState) {
+    try {
+      HRegionInfo hri = newState.getRegion();
+
+      // Update meta before checking for initialization. Meta state stored in 
zk.
+      if (hri.isMetaRegion()) {
+        // persist meta state in MetaTableLocator (which in turn is zk storage 
currently)
+        try {
+          MetaTableLocator.setMetaLocation(server.getZooKeeper(),
+            newState.getServerName(), hri.getReplicaId(), newState.getState());
+          return; // Done
+        } catch (KeeperException e) {
+          throw new IOException("Failed to update meta ZNode", e);
+        }
+      }
+
+      if (!initialized
+          || !shouldPersistStateChange(hri, newState, oldState)) {
+        return;
+      }
+
+      ServerName oldServer = oldState != null ? oldState.getServerName() : 
null;
+      ServerName serverName = newState.getServerName();
+      State state = newState.getState();
+
+      int replicaId = hri.getReplicaId();
+      Put metaPut = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
+      StringBuilder info = new StringBuilder("Updating hbase:meta row ");
+      info.append(hri.getRegionNameAsString()).append(" with 
state=").append(state);
+      if (serverName != null && !serverName.equals(oldServer)) {
+        metaPut.addImmutable(HConstants.CATALOG_FAMILY, 
getServerNameColumn(replicaId),
+          Bytes.toBytes(serverName.getServerName()));
+        info.append(", sn=").append(serverName);
+      }
+      if (openSeqNum >= 0) {
+        Preconditions.checkArgument(state == State.OPEN
+          && serverName != null, "Open region should be on a server");
+        MetaTableAccessor.addLocation(metaPut, serverName, openSeqNum, -1, 
replicaId);
+        info.append(", openSeqNum=").append(openSeqNum);
+        info.append(", server=").append(serverName);
+      }
+      metaPut.addImmutable(HConstants.CATALOG_FAMILY, 
getStateColumn(replicaId),
+        Bytes.toBytes(state.name()));
+      LOG.info(info);
+      HTableDescriptor descriptor = 
server.getTableDescriptors().get(hri.getTable());
+      boolean serial = false;
+      if (descriptor != null) {
+        serial = 
server.getTableDescriptors().get(hri.getTable()).hasSerialReplicationScope();
+      }
+      boolean shouldPutBarrier = serial && state == State.OPEN;
+      // Persist the state change to meta
+      if (metaRegion != null) {
+        try {
+          // Assume meta is pinned to master.
+          // At least, that's what we want.
+          metaRegion.put(metaPut);
+          if (shouldPutBarrier) {
+            Put barrierPut = 
MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
+                openSeqNum, hri.getTable().getName());
+            metaRegion.put(barrierPut);
+          }
+          return; // Done here
+        } catch (Throwable t) {
+          // In unit tests, meta could be moved away by intention
+          // So, the shortcut is gone. We won't try to establish the
+          // shortcut any more because we prefer meta to be pinned
+          // to the master
+          synchronized (this) {
+            if (metaRegion != null) {
+              LOG.info("Meta region shortcut failed", t);
+              if (multiHConnection == null) {
+                multiHConnection = new 
MultiHConnection(server.getConfiguration(), 1);
+              }
+              metaRegion = null;
+            }
+          }
+        }
+      }
+      // Called when meta is not on master
+      List<Put> list = shouldPutBarrier ?
+          Arrays.asList(metaPut, 
MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
+              openSeqNum, hri.getTable().getName())) : 
Collections.singletonList(metaPut);
+      multiHConnection.processBatchCallback(list, TableName.META_TABLE_NAME, 
null, null);
+
+    } catch (IOException ioe) {
+      LOG.error("Failed to persist region state " + newState, ioe);
+      server.abort("Failed to update region location", ioe);
+    }
+  }
+
+  void splitRegion(HRegionInfo p,
+      HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) 
throws IOException {
+    MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, 
regionReplication,
+        
server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
+  }
+
+  void mergeRegions(HRegionInfo p,
+      HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) 
throws IOException {
+    MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, 
regionReplication,
+        EnvironmentEdgeManager.currentTime(),
+        
server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
new file mode 100644
index 0000000..dcbf5a4
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
@@ -0,0 +1,1170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Region state accountant. It holds the states of all regions in the memory.
+ * In normal scenario, it should match the meta table and the true region 
states.
+ *
+ * This map is used by AssignmentManager to track region states.
+ */
+@InterfaceAudience.Private
+public class RegionStates {
+  private static final Log LOG = LogFactory.getLog(RegionStates.class);
+
+  public final static RegionStateStampComparator REGION_STATE_COMPARATOR =
+    new RegionStateStampComparator();
+
+  // This comparator sorts the RegionStates by time stamp then Region name.
+  // Comparing by timestamp alone can lead us to discard different 
RegionStates that happen
+  // to share a timestamp.
+  private static class RegionStateStampComparator implements 
Comparator<RegionState> {
+    @Override
+    public int compare(RegionState l, RegionState r) {
+      return Long.compare(l.getStamp(), r.getStamp()) == 0 ?
+          Bytes.compareTo(l.getRegion().getRegionName(), 
r.getRegion().getRegionName()) :
+          Long.compare(l.getStamp(), r.getStamp());
+    }
+  }
+
+  /**
+   * Regions currently in transition.
+   */
+  final HashMap<String, RegionState> regionsInTransition = new HashMap<>();
+
+  /**
+   * Region encoded name to state map.
+   * All the regions should be in this map.
+   */
+  private final Map<String, RegionState> regionStates = new HashMap<>();
+
+  /**
+   * Holds mapping of table -> region state
+   */
+  private final Map<TableName, Map<String, RegionState>> 
regionStatesTableIndex = new HashMap<>();
+
+  /**
+   * Server to regions assignment map.
+   * Contains the set of regions currently assigned to a given server.
+   */
+  private final Map<ServerName, Set<HRegionInfo>> serverHoldings = new 
HashMap<>();
+
+  /**
+   * Maintains the mapping from the default region to the replica regions.
+   */
+  private final Map<HRegionInfo, Set<HRegionInfo>> 
defaultReplicaToOtherReplicas = new HashMap<>();
+
+  /**
+   * Region to server assignment map.
+   * Contains the server a given region is currently assigned to.
+   */
+  private final TreeMap<HRegionInfo, ServerName> regionAssignments = new 
TreeMap<>();
+
+  /**
+   * Encoded region name to server assignment map for re-assignment
+   * purpose. Contains the server a given region is last known assigned
+   * to, which has not completed log splitting, so not assignable.
+   * If a region is currently assigned, this server info in this
+   * map should be the same as that in regionAssignments.
+   * However the info in regionAssignments is cleared when the region
+   * is offline while the info in lastAssignments is cleared when
+   * the region is closed or the server is dead and processed.
+   */
+  private final HashMap<String, ServerName> lastAssignments = new HashMap<>();
+
+  /**
+   * Encoded region name to server assignment map for the
+   * purpose to clean up serverHoldings when a region is online
+   * on a new server. When the region is offline from the previous
+   * server, we cleaned up regionAssignments so that it has the
+   * latest assignment map. But we didn't clean up serverHoldings
+   * to match the meta. We need this map to find out the old server
+   * whose serverHoldings needs cleanup, given a moved region.
+   */
+  private final HashMap<String, ServerName> oldAssignments = new HashMap<>();
+
+  /**
+   * Map a host port pair string to the latest start code
+   * of a region server which is known to be dead. It is dead
+   * to us, but server manager may not know it yet.
+   */
+  private final HashMap<String, Long> deadServers = new HashMap<>();
+
+  /**
+   * Map a dead servers to the time when log split is done.
+   * Since log splitting is not ordered, we have to remember
+   * all processed instances. The map is cleaned up based
+   * on a configured time. By default, we assume a dead
+   * server should be done with log splitting in two hours.
+   */
+  private final HashMap<ServerName, Long> processedServers = new HashMap<>();
+  private long lastProcessedServerCleanTime;
+
+  private final TableStateManager tableStateManager;
+  private final RegionStateStore regionStateStore;
+  private final ServerManager serverManager;
+  private final MasterServices server;
+
+  // The maximum time to keep a log split info in region states map
+  static final String LOG_SPLIT_TIME = 
"hbase.master.maximum.logsplit.keeptime";
+  static final long DEFAULT_LOG_SPLIT_TIME = 7200000L; // 2 hours
+
+  RegionStates(final MasterServices master, final TableStateManager 
tableStateManager,
+      final ServerManager serverManager, final RegionStateStore 
regionStateStore) {
+    this.tableStateManager = tableStateManager;
+    this.regionStateStore = regionStateStore;
+    this.serverManager = serverManager;
+    this.server = master;
+  }
+
+  /**
+   * @return a copy of the region assignment map
+   */
+  public synchronized Map<HRegionInfo, ServerName> getRegionAssignments() {
+    return new TreeMap<>(regionAssignments);
+  }
+
+  /**
+   * Return the replicas (including default) for the regions grouped by 
ServerName
+   * @param regions
+   * @return a pair containing the groupings as a map
+   */
+  synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignments(
+    Collection<HRegionInfo> regions) {
+    Map<ServerName, List<HRegionInfo>> map = new HashMap<>();
+    for (HRegionInfo region : regions) {
+      HRegionInfo defaultReplica = 
RegionReplicaUtil.getRegionInfoForDefaultReplica(region);
+      Set<HRegionInfo> allReplicas = 
defaultReplicaToOtherReplicas.get(defaultReplica);
+      if (allReplicas != null) {
+        for (HRegionInfo hri : allReplicas) {
+          ServerName server = regionAssignments.get(hri);
+          if (server != null) {
+            List<HRegionInfo> regionsOnServer = map.get(server);
+            if (regionsOnServer == null) {
+              regionsOnServer = new ArrayList<>(1);
+              map.put(server, regionsOnServer);
+            }
+            regionsOnServer.add(hri);
+          }
+        }
+      }
+    }
+    return map;
+  }
+
+  public synchronized ServerName getRegionServerOfRegion(HRegionInfo hri) {
+    return regionAssignments.get(hri);
+  }
+
+  /**
+   * Get regions in transition and their states
+   */
+  public synchronized Set<RegionState> getRegionsInTransition() {
+    return new HashSet<>(regionsInTransition.values());
+  }
+
+  public synchronized SortedSet<RegionState> 
getRegionsInTransitionOrderedByTimestamp() {
+    final TreeSet<RegionState> rit = new TreeSet<>(REGION_STATE_COMPARATOR);
+    for (RegionState rs: regionsInTransition.values()) {
+      rit.add(rs);
+    }
+    return rit;
+  }
+
+  /**
+   * Get the number of regions in transition.
+   */
+  public synchronized int getRegionsInTransitionCount() {
+    return regionsInTransition.size();
+  }
+
+  /**
+   * @return True if specified region in transition.
+   */
+  public synchronized boolean isRegionInTransition(final HRegionInfo hri) {
+    return regionsInTransition.containsKey(hri.getEncodedName());
+  }
+
+  /**
+   * @return True if specified region in transition.
+   */
+  public synchronized boolean isRegionInTransition(final String encodedName) {
+    return regionsInTransition.containsKey(encodedName);
+  }
+
+  /**
+   * @return True if any region in transition.
+   */
+  public synchronized boolean isRegionsInTransition() {
+    return !regionsInTransition.isEmpty();
+  }
+
+  /**
+   * @return True if hbase:meta table region is in transition.
+   */
+  public synchronized boolean isMetaRegionInTransition() {
+    for (RegionState state : regionsInTransition.values()) {
+      if (state.getRegion().isMetaRegion()) return true;
+    }
+    return false;
+  }
+
+  /**
+   * @return True if specified region assigned, and not in transition.
+   */
+  public synchronized boolean isRegionOnline(final HRegionInfo hri) {
+    return !isRegionInTransition(hri) && regionAssignments.containsKey(hri);
+  }
+
+  /**
+   * @return True if specified region offline/closed, but not in transition.
+   * If the region is not in the map, it is offline to us too.
+   */
+  public synchronized boolean isRegionOffline(final HRegionInfo hri) {
+    return getRegionState(hri) == null || (!isRegionInTransition(hri)
+      && isRegionInState(hri, State.OFFLINE, State.CLOSED));
+  }
+
+  /**
+   * @return True if specified region is in one of the specified states.
+   */
+  public boolean isRegionInState(
+      final HRegionInfo hri, final State... states) {
+    return isRegionInState(hri.getEncodedName(), states);
+  }
+
+  /**
+   * @return True if specified region is in one of the specified states.
+   */
+  public boolean isRegionInState(
+      final String encodedName, final State... states) {
+    RegionState regionState = getRegionState(encodedName);
+    return isOneOfStates(regionState, states);
+  }
+
+  /**
+   * Wait for the state map to be updated by assignment manager.
+   */
+  public synchronized void waitForUpdate(
+      final long timeout) throws InterruptedException {
+    this.wait(timeout);
+  }
+
+  /**
+   * Get region transition state
+   */
+  public RegionState getRegionTransitionState(final HRegionInfo hri) {
+    return getRegionTransitionState(hri.getEncodedName());
+  }
+
+  /**
+   * Get region transition state
+   */
+  public synchronized RegionState
+      getRegionTransitionState(final String encodedName) {
+    return regionsInTransition.get(encodedName);
+  }
+
+  /**
+   * Add a list of regions to RegionStates. If a region is split
+   * and offline, its state will be SPLIT. Otherwise, its state will
+   * be OFFLINE. Region already in RegionStates will be skipped.
+   */
+  public void createRegionStates(
+      final List<HRegionInfo> hris) {
+    for (HRegionInfo hri: hris) {
+      createRegionState(hri);
+    }
+  }
+
+  /**
+   * Add a region to RegionStates. If the region is split
+   * and offline, its state will be SPLIT. Otherwise, its state will
+   * be OFFLINE. If it is already in RegionStates, this call has
+   * no effect, and the original state is returned.
+   */
+  public RegionState createRegionState(final HRegionInfo hri) {
+    return createRegionState(hri, null, null, null);
+  }
+
+  /**
+   * Add a region to RegionStates with the specified state.
+   * If the region is already in RegionStates, this call has
+   * no effect, and the original state is returned.
+   *
+   * @param hri the region info to create a state for
+   * @param newState the state to the region in set to
+   * @param serverName the server the region is transitioning on
+   * @param lastHost the last server that hosts the region
+   * @return the current state
+   */
+  public synchronized RegionState createRegionState(final HRegionInfo hri,
+      State newState, ServerName serverName, ServerName lastHost) {
+    if (newState == null || (newState == State.OPEN && serverName == null)) {
+      newState =  State.OFFLINE;
+    }
+    if (hri.isOffline() && hri.isSplit()) {
+      newState = State.SPLIT;
+      serverName = null;
+    }
+    String encodedName = hri.getEncodedName();
+    RegionState regionState = regionStates.get(encodedName);
+    if (regionState != null) {
+      LOG.warn("Tried to create a state for a region already in RegionStates, "
+        + "used existing: " + regionState + ", ignored new: " + newState);
+    } else {
+      regionState = new RegionState(hri, newState, serverName);
+      putRegionState(regionState);
+      if (newState == State.OPEN) {
+        if (!serverName.equals(lastHost)) {
+          LOG.warn("Open region's last host " + lastHost
+            + " should be the same as the current one " + serverName
+            + ", ignored the last and used the current one");
+          lastHost = serverName;
+        }
+        lastAssignments.put(encodedName, lastHost);
+        regionAssignments.put(hri, lastHost);
+      } else if (!isOneOfStates(regionState, State.MERGED, State.SPLIT, 
State.OFFLINE)) {
+        regionsInTransition.put(encodedName, regionState);
+      }
+      if (lastHost != null && newState != State.SPLIT) {
+        addToServerHoldings(lastHost, hri);
+        if (newState != State.OPEN) {
+          oldAssignments.put(encodedName, lastHost);
+        }
+      }
+    }
+    return regionState;
+  }
+
+  private RegionState putRegionState(RegionState regionState) {
+    HRegionInfo hri = regionState.getRegion();
+    String encodedName = hri.getEncodedName();
+    TableName table = hri.getTable();
+    RegionState oldState = regionStates.put(encodedName, regionState);
+    Map<String, RegionState> map = regionStatesTableIndex.get(table);
+    if (map == null) {
+      map = new HashMap<>();
+      regionStatesTableIndex.put(table, map);
+    }
+    map.put(encodedName, regionState);
+    return oldState;
+  }
+
+  /**
+   * Update a region state. It will be put in transition if not already there.
+   */
+  public RegionState updateRegionState(
+      final HRegionInfo hri, final State state) {
+    RegionState regionState = getRegionState(hri.getEncodedName());
+    return updateRegionState(hri, state,
+      regionState == null ? null : regionState.getServerName());
+  }
+
+  /**
+   * Update a region state. It will be put in transition if not already there.
+   */
+  public RegionState updateRegionState(
+      final HRegionInfo hri, final State state, final ServerName serverName) {
+    return updateRegionState(hri, state, serverName, HConstants.NO_SEQNUM);
+  }
+
+  public void regionOnline(final HRegionInfo hri, final ServerName serverName) 
{
+    regionOnline(hri, serverName, HConstants.NO_SEQNUM);
+  }
+
+  /**
+   * A region is online, won't be in transition any more.
+   * We can't confirm it is really online on specified region server
+   * because it hasn't been put in region server's online region list yet.
+   */
+  public void regionOnline(final HRegionInfo hri, final ServerName serverName, 
long openSeqNum) {
+    String encodedName = hri.getEncodedName();
+    if (!serverManager.isServerOnline(serverName)) {
+      // This is possible if the region server dies before master gets a
+      // chance to handle ZK event in time. At this time, if the dead server
+      // is already processed by SSH, we should ignore this event.
+      // If not processed yet, ignore and let SSH deal with it.
+      LOG.warn("Ignored, " + encodedName + " was opened on a dead server: " + 
serverName);
+      return;
+    }
+    updateRegionState(hri, State.OPEN, serverName, openSeqNum);
+
+    synchronized (this) {
+      RegionState regionState = regionsInTransition.remove(encodedName);
+      // When region is online and remove from regionsInTransition,
+      // update the RIT duration to assignment manager metrics
+      if (regionState != null && this.server.getAssignmentManager() != null) {
+        long ritDuration = System.currentTimeMillis() - regionState.getStamp()
+            + regionState.getRitDuration();
+        this.server.getAssignmentManager().getAssignmentManagerMetrics()
+            .updateRitDuration(ritDuration);
+      }
+      ServerName oldServerName = regionAssignments.put(hri, serverName);
+      if (!serverName.equals(oldServerName)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + 
serverName);
+        }
+        addToServerHoldings(serverName, hri);
+        addToReplicaMapping(hri);
+        if (oldServerName == null) {
+          oldServerName = oldAssignments.remove(encodedName);
+        }
+        if (oldServerName != null
+            && !oldServerName.equals(serverName)
+            && serverHoldings.containsKey(oldServerName)) {
+          LOG.info("Offlined " + hri.getShortNameToLog() + " from " + 
oldServerName);
+          removeFromServerHoldings(oldServerName, hri);
+        }
+      }
+    }
+  }
+
+  private void addToServerHoldings(ServerName serverName, HRegionInfo hri) {
+    Set<HRegionInfo> regions = serverHoldings.get(serverName);
+    if (regions == null) {
+      regions = new HashSet<>();
+      serverHoldings.put(serverName, regions);
+    }
+    regions.add(hri);
+  }
+
+  private void addToReplicaMapping(HRegionInfo hri) {
+    HRegionInfo defaultReplica = 
RegionReplicaUtil.getRegionInfoForDefaultReplica(hri);
+    Set<HRegionInfo> replicas =
+        defaultReplicaToOtherReplicas.get(defaultReplica);
+    if (replicas == null) {
+      replicas = new HashSet<>();
+      defaultReplicaToOtherReplicas.put(defaultReplica, replicas);
+    }
+    replicas.add(hri);
+  }
+
+  private void removeFromServerHoldings(ServerName serverName, HRegionInfo 
hri) {
+    Set<HRegionInfo> oldRegions = serverHoldings.get(serverName);
+    oldRegions.remove(hri);
+    if (oldRegions.isEmpty()) {
+      serverHoldings.remove(serverName);
+    }
+  }
+
+  private void removeFromReplicaMapping(HRegionInfo hri) {
+    HRegionInfo defaultReplica = 
RegionReplicaUtil.getRegionInfoForDefaultReplica(hri);
+    Set<HRegionInfo> replicas = 
defaultReplicaToOtherReplicas.get(defaultReplica);
+    if (replicas != null) {
+      replicas.remove(hri);
+      if (replicas.isEmpty()) {
+        defaultReplicaToOtherReplicas.remove(defaultReplica);
+      }
+    }
+  }
+
+  /**
+   * A dead server's wals have been split so that all the regions
+   * used to be open on it can be safely assigned now. Mark them assignable.
+   */
+  public synchronized void logSplit(final ServerName serverName) {
+    for (Iterator<Map.Entry<String, ServerName>> it
+        = lastAssignments.entrySet().iterator(); it.hasNext();) {
+      Map.Entry<String, ServerName> e = it.next();
+      if (e.getValue().equals(serverName)) {
+        it.remove();
+      }
+    }
+    long now = System.currentTimeMillis();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding to log splitting servers " + serverName);
+    }
+    processedServers.put(serverName, Long.valueOf(now));
+    Configuration conf = server.getConfiguration();
+    long obsoleteTime = conf.getLong(LOG_SPLIT_TIME, DEFAULT_LOG_SPLIT_TIME);
+    // Doesn't have to be very accurate about the clean up time
+    if (now > lastProcessedServerCleanTime + obsoleteTime) {
+      lastProcessedServerCleanTime = now;
+      long cutoff = now - obsoleteTime;
+      for (Iterator<Map.Entry<ServerName, Long>> it
+          = processedServers.entrySet().iterator(); it.hasNext();) {
+        Map.Entry<ServerName, Long> e = it.next();
+        if (e.getValue().longValue() < cutoff) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Removed from log splitting servers " + e.getKey());
+          }
+          it.remove();
+        }
+      }
+    }
+  }
+
+  /**
+   * Log split is done for a given region, so it is assignable now.
+   */
+  public void logSplit(final HRegionInfo region) {
+    clearLastAssignment(region);
+  }
+
+  public synchronized void clearLastAssignment(final HRegionInfo region) {
+    lastAssignments.remove(region.getEncodedName());
+  }
+
+  /**
+   * A region is offline, won't be in transition any more.
+   */
+  public void regionOffline(final HRegionInfo hri) {
+    regionOffline(hri, null);
+  }
+
+  /**
+   * A region is offline, won't be in transition any more. Its state
+   * should be the specified expected state, which can only be
+   * Split/Merged/Offline/null(=Offline)/SplittingNew/MergingNew.
+   */
+  public void regionOffline(
+      final HRegionInfo hri, final State expectedState) {
+    Preconditions.checkArgument(expectedState == null
+      || RegionState.isUnassignable(expectedState),
+        "Offlined region should not be " + expectedState);
+    if (isRegionInState(hri, State.SPLITTING_NEW, State.MERGING_NEW)) {
+      // Remove it from all region maps
+      deleteRegion(hri);
+      return;
+    }
+    State newState =
+      expectedState == null ? State.OFFLINE : expectedState;
+    updateRegionState(hri, newState);
+    String encodedName = hri.getEncodedName();
+    synchronized (this) {
+      regionsInTransition.remove(encodedName);
+      ServerName oldServerName = regionAssignments.remove(hri);
+      if (oldServerName != null && serverHoldings.containsKey(oldServerName)) {
+        if (newState == State.MERGED || newState == State.SPLIT
+            || hri.isMetaRegion() || 
tableStateManager.isTableState(hri.getTable(),
+              TableState.State.DISABLED, TableState.State.DISABLING)) {
+          // Offline the region only if it's merged/split, or the table is 
disabled/disabling.
+          // Otherwise, offline it from this server only when it is online on 
a different server.
+          LOG.info("Offlined " + hri.getShortNameToLog() + " from " + 
oldServerName);
+          removeFromServerHoldings(oldServerName, hri);
+          removeFromReplicaMapping(hri);
+        } else {
+          // Need to remember it so that we can offline it from this
+          // server when it is online on a different server.
+          oldAssignments.put(encodedName, oldServerName);
+        }
+      }
+    }
+  }
+
+  /**
+   * A server is offline, all regions on it are dead.
+   */
+  public List<HRegionInfo> serverOffline(final ServerName sn) {
+    // Offline all regions on this server not already in transition.
+    List<HRegionInfo> rits = new ArrayList<>();
+    Set<HRegionInfo> regionsToCleanIfNoMetaEntry = new HashSet<>();
+    // Offline regions outside the loop and synchronized block to avoid
+    // ConcurrentModificationException and deadlock in case of meta anassigned,
+    // but RegionState a blocked.
+    Set<HRegionInfo> regionsToOffline = new HashSet<>();
+    synchronized (this) {
+      Set<HRegionInfo> assignedRegions = serverHoldings.get(sn);
+      if (assignedRegions == null) {
+        assignedRegions = new HashSet<>();
+      }
+
+      for (HRegionInfo region : assignedRegions) {
+        // Offline open regions, no need to offline if SPLIT/MERGED/OFFLINE
+        if (isRegionOnline(region)) {
+          regionsToOffline.add(region);
+        } else if (isRegionInState(region, State.SPLITTING, State.MERGING)) {
+          LOG.debug("Offline splitting/merging region " + 
getRegionState(region));
+          regionsToOffline.add(region);
+        }
+      }
+
+      for (RegionState state : regionsInTransition.values()) {
+        HRegionInfo hri = state.getRegion();
+        if (assignedRegions.contains(hri)) {
+          // Region is open on this region server, but in transition.
+          // This region must be moving away from this server, or 
splitting/merging.
+          // SSH will handle it, either skip assigning, or re-assign.
+          LOG.info("Transitioning " + state + " will be handled by 
ServerCrashProcedure for " + sn);
+        } else if (sn.equals(state.getServerName())) {
+          // Region is in transition on this region server, and this
+          // region is not open on this server. So the region must be
+          // moving to this server from another one (i.e. opening or
+          // pending open on this server, was open on another one.
+          // Offline state is also kind of pending open if the region is in
+          // transition. The region could be in failed_close state too if we 
have
+          // tried several times to open it while this region server is not 
reachable)
+          if (isOneOfStates(state, State.OPENING, State.PENDING_OPEN,
+              State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
+            LOG.info("Found region in " + state +
+              " to be reassigned by ServerCrashProcedure for " + sn);
+            rits.add(hri);
+          } else if (isOneOfStates(state, State.SPLITTING_NEW, 
State.MERGING_NEW)) {
+            regionsToCleanIfNoMetaEntry.add(state.getRegion());
+          } else {
+            LOG.warn("THIS SHOULD NOT HAPPEN: unexpected " + state);
+          }
+        }
+      }
+      this.notifyAll();
+    }
+
+    for (HRegionInfo hri : regionsToOffline) {
+      regionOffline(hri);
+    }
+
+    cleanIfNoMetaEntry(regionsToCleanIfNoMetaEntry);
+    return rits;
+  }
+
+  /**
+   * This method does an RPC to hbase:meta. Do not call this method with a 
lock/synchronize held.
+   * @param hris The hris to check if empty in hbase:meta and if so, clean 
them up.
+   */
+  private void cleanIfNoMetaEntry(Set<HRegionInfo> hris) {
+    if (hris.isEmpty()) return;
+    for (HRegionInfo hri: hris) {
+      try {
+        // This is RPC to meta table. It is done while we have a synchronize on
+        // regionstates. No progress will be made if meta is not available at 
this time.
+        // This is a cleanup task. Not critical.
+        if (MetaTableAccessor.getRegion(server.getConnection(), 
hri.getEncodedNameAsBytes()) ==
+            null) {
+          regionOffline(hri);
+          FSUtils.deleteRegionDir(server.getConfiguration(), hri);
+        }
+      } catch (IOException e) {
+        LOG.warn("Got exception while deleting " + hri + " directories from 
file system.", e);
+      }
+    }
+  }
+
+  /**
+   * Gets the online regions of the specified table.
+   * This method looks at the in-memory state.  It does not go to 
<code>hbase:meta</code>.
+   * Only returns <em>online</em> regions.  If a region on this table has been
+   * closed during a disable, etc., it will be included in the returned list.
+   * So, the returned list may not necessarily be ALL regions in this table, 
its
+   * all the ONLINE regions in the table.
+   * @param tableName
+   * @return Online regions from <code>tableName</code>
+   */
+  public synchronized List<HRegionInfo> getRegionsOfTable(TableName tableName) 
{
+    List<HRegionInfo> tableRegions = new ArrayList<>();
+    // boundary needs to have table's name but regionID 0 so that it is sorted
+    // before all table's regions.
+    HRegionInfo boundary = new HRegionInfo(tableName, null, null, false, 0L);
+    for (HRegionInfo hri: regionAssignments.tailMap(boundary).keySet()) {
+      if(!hri.getTable().equals(tableName)) break;
+      tableRegions.add(hri);
+    }
+    return tableRegions;
+  }
+
+  /**
+   * Gets current state of all regions of the table.
+   * This method looks at the in-memory state.  It does not go to 
<code>hbase:meta</code>.
+   * Method guaranteed to return keys for all states
+   * in {@link org.apache.hadoop.hbase.master.RegionState.State}
+   *
+   * @param tableName
+   * @return Online regions from <code>tableName</code>
+   */
+  public synchronized Map<RegionState.State, List<HRegionInfo>>
+  getRegionByStateOfTable(TableName tableName) {
+    Map<RegionState.State, List<HRegionInfo>> tableRegions = new HashMap<>();
+    for (State state : State.values()) {
+      tableRegions.put(state, new ArrayList<>());
+    }
+    Map<String, RegionState> indexMap = regionStatesTableIndex.get(tableName);
+    if (indexMap == null)
+      return tableRegions;
+    for (RegionState regionState : indexMap.values()) {
+      tableRegions.get(regionState.getState()).add(regionState.getRegion());
+    }
+    return tableRegions;
+  }
+
+  /**
+   * Wait on region to clear regions-in-transition.
+   * <p>
+   * If the region isn't in transition, returns immediately.  Otherwise, method
+   * blocks until the region is out of transition.
+   */
+  public synchronized void waitOnRegionToClearRegionsInTransition(
+      final HRegionInfo hri) throws InterruptedException {
+    if (!isRegionInTransition(hri)) return;
+
+    while(!server.isStopped() && isRegionInTransition(hri)) {
+      RegionState rs = getRegionState(hri);
+      LOG.info("Waiting on " + rs + " to clear regions-in-transition");
+      waitForUpdate(100);
+    }
+
+    if (server.isStopped()) {
+      LOG.info("Giving up wait on region in " +
+        "transition because stoppable.isStopped is set");
+    }
+  }
+
+  /**
+   * A table is deleted. Remove its regions from all internal maps.
+   * We loop through all regions assuming we don't delete tables too much.
+   */
+  public void tableDeleted(final TableName tableName) {
+    Set<HRegionInfo> regionsToDelete = new HashSet<>();
+    synchronized (this) {
+      for (RegionState state: regionStates.values()) {
+        HRegionInfo region = state.getRegion();
+        if (region.getTable().equals(tableName)) {
+          regionsToDelete.add(region);
+        }
+      }
+    }
+    for (HRegionInfo region: regionsToDelete) {
+      deleteRegion(region);
+    }
+  }
+
+  /**
+   * Get a copy of all regions assigned to a server
+   */
+  public synchronized Set<HRegionInfo> getServerRegions(ServerName serverName) 
{
+    Set<HRegionInfo> regions = serverHoldings.get(serverName);
+    if (regions == null) return null;
+    return new HashSet<>(regions);
+  }
+
+  /**
+   * Remove a region from all state maps.
+   */
+  @VisibleForTesting
+  public synchronized void deleteRegion(final HRegionInfo hri) {
+    String encodedName = hri.getEncodedName();
+    regionsInTransition.remove(encodedName);
+    regionStates.remove(encodedName);
+    TableName table = hri.getTable();
+    Map<String, RegionState> indexMap = regionStatesTableIndex.get(table);
+    indexMap.remove(encodedName);
+    if (indexMap.isEmpty())
+      regionStatesTableIndex.remove(table);
+    lastAssignments.remove(encodedName);
+    ServerName sn = regionAssignments.remove(hri);
+    if (sn != null) {
+      Set<HRegionInfo> regions = serverHoldings.get(sn);
+      regions.remove(hri);
+    }
+  }
+
+  /**
+   * Checking if a region was assigned to a server which is not online now.
+   * If so, we should hold re-assign this region till SSH has split its wals.
+   * Once logs are split, the last assignment of this region will be reset,
+   * which means a null last assignment server is ok for re-assigning.
+   *
+   * A region server could be dead but we don't know it yet. We may
+   * think it's online falsely. Therefore if a server is online, we still
+   * need to confirm it reachable and having the expected start code.
+   */
+  synchronized boolean wasRegionOnDeadServer(final String encodedName) {
+    ServerName server = lastAssignments.get(encodedName);
+    return isServerDeadAndNotProcessed(server);
+  }
+
+  synchronized boolean isServerDeadAndNotProcessed(ServerName server) {
+    if (server == null) return false;
+    if (serverManager.isServerOnline(server)) {
+      String hostAndPort = server.getHostAndPort();
+      long startCode = server.getStartcode();
+      Long deadCode = deadServers.get(hostAndPort);
+      if (deadCode == null || startCode > deadCode.longValue()) {
+        if (serverManager.isServerReachable(server)) {
+          return false;
+        }
+        // The size of deadServers won't grow unbounded.
+        deadServers.put(hostAndPort, Long.valueOf(startCode));
+      }
+      // Watch out! If the server is not dead, the region could
+      // remain unassigned. That's why ServerManager#isServerReachable
+      // should use some retry.
+      //
+      // We cache this info since it is very unlikely for that
+      // instance to come back up later on. We don't want to expire
+      // the server since we prefer to let it die naturally.
+      LOG.warn("Couldn't reach online server " + server);
+    }
+    // Now, we know it's dead. Check if it's processed
+    return !processedServers.containsKey(server);
+  }
+
+ /**
+   * Get the last region server a region was on for purpose of re-assignment,
+   * i.e. should the re-assignment be held back till log split is done?
+   */
+  synchronized ServerName getLastRegionServerOfRegion(final String 
encodedName) {
+    return lastAssignments.get(encodedName);
+  }
+
+  synchronized void setLastRegionServerOfRegions(
+      final ServerName serverName, final List<HRegionInfo> regionInfos) {
+    for (HRegionInfo hri: regionInfos) {
+      setLastRegionServerOfRegion(serverName, hri.getEncodedName());
+    }
+  }
+
+  synchronized void setLastRegionServerOfRegion(
+      final ServerName serverName, final String encodedName) {
+    lastAssignments.put(encodedName, serverName);
+  }
+
+  synchronized boolean isRegionOnServer(
+      final HRegionInfo hri, final ServerName serverName) {
+    Set<HRegionInfo> regions = serverHoldings.get(serverName);
+    return regions == null ? false : regions.contains(hri);
+  }
+
+  public void prepareAssignDaughters(HRegionInfo a, HRegionInfo b) {
+     synchronized (this) {
+       if (isRegionInState(a, State.SPLITTING_NEW)) {
+         updateRegionState(a, State.OFFLINE, null);
+       }
+       if (isRegionInState(b, State.SPLITTING_NEW)) {
+         updateRegionState(b, State.OFFLINE, null);
+       }
+     }
+   }
+
+  public void prepareAssignMergedRegion(HRegionInfo mergedRegion) {
+    synchronized (this) {
+      if (isRegionInState(mergedRegion, State.MERGING_NEW)) {
+        updateRegionState(mergedRegion, State.OFFLINE, null);
+      }
+    }
+  }
+
+  void splitRegion(HRegionInfo p,
+      HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException {
+
+    regionStateStore.splitRegion(p, a, b, sn, getRegionReplication(p));
+    synchronized (this) {
+      // After PONR, split is considered to be done.
+      // Update server holdings to be aligned with the meta.
+      Set<HRegionInfo> regions = serverHoldings.get(sn);
+      if (regions == null) {
+        throw new IllegalStateException(sn + " should host some regions");
+      }
+      regions.remove(p);
+      regions.add(a);
+      regions.add(b);
+    }
+  }
+
+  void mergeRegions(HRegionInfo p,
+      HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException {
+    regionStateStore.mergeRegions(p, a, b, sn, getRegionReplication(a));
+    synchronized (this) {
+      // After PONR, merge is considered to be done.
+      // Update server holdings to be aligned with the meta.
+      Set<HRegionInfo> regions = serverHoldings.get(sn);
+      if (regions == null) {
+        throw new IllegalStateException(sn + " should host some regions");
+      }
+      regions.remove(a);
+      regions.remove(b);
+      regions.add(p);
+    }
+  }
+
+  private int getRegionReplication(HRegionInfo r) throws IOException {
+    if (tableStateManager != null) {
+      HTableDescriptor htd = server.getTableDescriptors().get(r.getTable());
+      if (htd != null) {
+        return htd.getRegionReplication();
+      }
+    }
+    return 1;
+  }
+
+  /**
+   * At cluster clean re/start, mark all user regions closed except those of 
tables
+   * that are excluded, such as disabled/disabling/enabling tables. All user 
regions
+   * and their previous locations are returned.
+   */
+  synchronized Map<HRegionInfo, ServerName> closeAllUserRegions(Set<TableName> 
excludedTables) {
+    boolean noExcludeTables = excludedTables == null || 
excludedTables.isEmpty();
+    Set<HRegionInfo> toBeClosed = new HashSet<>(regionStates.size());
+    for(RegionState state: regionStates.values()) {
+      HRegionInfo hri = state.getRegion();
+      if (state.isSplit() || hri.isSplit()) {
+        continue;
+      }
+      TableName tableName = hri.getTable();
+      if (!TableName.META_TABLE_NAME.equals(tableName)
+          && (noExcludeTables || !excludedTables.contains(tableName))) {
+        toBeClosed.add(hri);
+      }
+    }
+    Map<HRegionInfo, ServerName> allUserRegions = new 
HashMap<>(toBeClosed.size());
+    for (HRegionInfo hri: toBeClosed) {
+      RegionState regionState = updateRegionState(hri, State.CLOSED);
+      allUserRegions.put(hri, regionState.getServerName());
+    }
+    return allUserRegions;
+  }
+
+  /**
+   * Compute the average load across all region servers.
+   * Currently, this uses a very naive computation - just uses the number of
+   * regions being served, ignoring stats about number of requests.
+   * @return the average load
+   */
+  protected synchronized double getAverageLoad() {
+    int numServers = 0, totalLoad = 0;
+    for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) 
{
+      Set<HRegionInfo> regions = e.getValue();
+      ServerName serverName = e.getKey();
+      int regionCount = regions.size();
+      if (serverManager.isServerOnline(serverName)) {
+        totalLoad += regionCount;
+        numServers++;
+      }
+    }
+    if (numServers > 1) {
+      // The master region server holds only a couple regions.
+      // Don't consider this server in calculating the average load
+      // if there are other region servers to avoid possible confusion.
+      Set<HRegionInfo> hris = serverHoldings.get(server.getServerName());
+      if (hris != null) {
+        totalLoad -= hris.size();
+        numServers--;
+      }
+    }
+    return numServers == 0 ? 0.0 :
+      (double)totalLoad / (double)numServers;
+  }
+
+  protected Map<TableName, Map<ServerName, List<HRegionInfo>>> 
getAssignmentsByTable() {
+    return getAssignmentsByTable(false);
+  }
+
+  /**
+   * This is an EXPENSIVE clone.  Cloning though is the safest thing to do.
+   * Can't let out original since it can change and at least the load balancer
+   * wants to iterate this exported list.  We need to synchronize on regions
+   * since all access to this.servers is under a lock on this.regions.
+   * @param forceByCluster a flag to force to aggregate the server-load to the 
cluster level
+   * @return A clone of current assignments by table.
+   */
+  protected Map<TableName, Map<ServerName, List<HRegionInfo>>> 
getAssignmentsByTable(
+          boolean forceByCluster) {
+    Map<TableName, Map<ServerName, List<HRegionInfo>>> result;
+    synchronized (this) {
+      result = getTableRSRegionMap(server.getConfiguration().getBoolean(
+              HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE,false) && 
!forceByCluster);
+    }
+    Map<ServerName, ServerLoad>
+            onlineSvrs = serverManager.getOnlineServers();
+    // Take care of servers w/o assignments, and remove servers in draining 
mode
+    List<ServerName> drainingServers = 
this.serverManager.getDrainingServersList();
+    for (Map<ServerName, List<HRegionInfo>> map: result.values()) {
+      for (ServerName svr: onlineSvrs.keySet()) {
+        if (!map.containsKey(svr)) {
+          map.put(svr, new ArrayList<>());
+        }
+      }
+      map.keySet().removeAll(drainingServers);
+    }
+    return result;
+  }
+
+  private Map<TableName, Map<ServerName, List<HRegionInfo>>> 
getTableRSRegionMap(Boolean bytable){
+    Map<TableName, Map<ServerName, List<HRegionInfo>>> result = new 
HashMap<>();
+    for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) 
{
+      for (HRegionInfo hri: e.getValue()) {
+        if (hri.isMetaRegion()) continue;
+        TableName tablename = bytable ? hri.getTable() : 
HConstants.ENSEMBLE_TABLE_NAME;
+        Map<ServerName, List<HRegionInfo>> svrToRegions = 
result.get(tablename);
+        if (svrToRegions == null) {
+          svrToRegions = new HashMap<>(serverHoldings.size());
+          result.put(tablename, svrToRegions);
+        }
+        List<HRegionInfo> regions = svrToRegions.get(e.getKey());
+        if (regions == null) {
+          regions = new ArrayList<>();
+          svrToRegions.put(e.getKey(), regions);
+        }
+        regions.add(hri);
+      }
+    }
+    return result;
+  }
+
+  public RegionState getRegionState(final HRegionInfo hri) {
+    return getRegionState(hri.getEncodedName());
+  }
+
+  /**
+   * Returns a clone of region assignments per server
+   * @return a Map of ServerName to a List of HRegionInfo's
+   */
+  protected synchronized Map<ServerName, List<HRegionInfo>> 
getRegionAssignmentsByServer() {
+    Map<ServerName, List<HRegionInfo>> regionsByServer = new 
HashMap<>(serverHoldings.size());
+    for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) 
{
+      regionsByServer.put(e.getKey(), new ArrayList<>(e.getValue()));
+    }
+    return regionsByServer;
+  }
+
+  public synchronized RegionState getRegionState(final String encodedName) {
+    return regionStates.get(encodedName);
+  }
+
+  /**
+   * Get the HRegionInfo from cache, if not there, from the hbase:meta table.
+   * Be careful. Does RPC. Do not hold a lock or synchronize when you call 
this method.
+   * @param  regionName
+   * @return HRegionInfo for the region
+   */
+  @SuppressWarnings("deprecation")
+  protected HRegionInfo getRegionInfo(final byte [] regionName) {
+    String encodedName = HRegionInfo.encodeRegionName(regionName);
+    RegionState regionState = getRegionState(encodedName);
+    if (regionState != null) {
+      return regionState.getRegion();
+    }
+
+    try {
+      Pair<HRegionInfo, ServerName> p =
+        MetaTableAccessor.getRegion(server.getConnection(), regionName);
+      HRegionInfo hri = p == null ? null : p.getFirst();
+      if (hri != null) {
+        createRegionState(hri);
+      }
+      return hri;
+    } catch (IOException e) {
+      server.abort("Aborting because error occurred while reading "
+        + Bytes.toStringBinary(regionName) + " from hbase:meta", e);
+      return null;
+    }
+  }
+
+  static boolean isOneOfStates(RegionState regionState, State... states) {
+    State s = regionState != null ? regionState.getState() : null;
+    for (State state: states) {
+      if (s == state) return true;
+    }
+    return false;
+  }
+
+  /**
+   * Update a region state. It will be put in transition if not already there.
+   */
+  private RegionState updateRegionState(final HRegionInfo hri,
+      final RegionState.State state, final ServerName serverName, long 
openSeqNum) {
+    if (state == RegionState.State.FAILED_CLOSE || state == 
RegionState.State.FAILED_OPEN) {
+      LOG.warn("Failed to open/close " + hri.getShortNameToLog()
+        + " on " + serverName + ", set to " + state);
+    }
+
+    String encodedName = hri.getEncodedName();
+    RegionState regionState = new RegionState(
+      hri, state, System.currentTimeMillis(), serverName);
+    RegionState oldState = getRegionState(encodedName);
+    if (!regionState.equals(oldState)) {
+      LOG.info("Transition " + oldState + " to " + regionState);
+      // Persist region state before updating in-memory info, if needed
+      regionStateStore.updateRegionState(openSeqNum, regionState, oldState);
+    }
+
+    synchronized (this) {
+      RegionState oldRegionState = regionsInTransition.put(encodedName, 
regionState);
+      // When region transform old region state to new region state,
+      // accumulate the RIT duration to new region state.
+      if (oldRegionState != null) {
+        regionState.updateRitDuration(oldRegionState.getStamp());
+      }
+      putRegionState(regionState);
+
+      // For these states, region should be properly closed.
+      // There should be no log splitting issue.
+      if ((state == State.CLOSED || state == State.MERGED
+          || state == State.SPLIT) && 
lastAssignments.containsKey(encodedName)) {
+        ServerName last = lastAssignments.get(encodedName);
+        if (last.equals(serverName)) {
+          lastAssignments.remove(encodedName);
+        } else {
+          LOG.warn(encodedName + " moved to " + state + " on "
+            + serverName + ", expected " + last);
+        }
+      }
+
+      // Once a region is opened, record its last assignment right away.
+      if (serverName != null && state == State.OPEN) {
+        ServerName last = lastAssignments.get(encodedName);
+        if (!serverName.equals(last)) {
+          lastAssignments.put(encodedName, serverName);
+          if (last != null && isServerDeadAndNotProcessed(last)) {
+            LOG.warn(encodedName + " moved to " + serverName
+              + ", while it's previous host " + last
+              + " is dead but not processed yet");
+          }
+        }
+      }
+
+      // notify the change
+      this.notifyAll();
+    }
+    return regionState;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 6ae9f0f..db0a0e5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -57,10 +57,12 @@ import org.apache.hadoop.hbase.ipc.FailedServerException;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -74,6 +76,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.RetryCounter;
@@ -311,8 +314,7 @@ public class ServerManager {
     }
   }
 
-  @VisibleForTesting
-  public void regionServerReport(ServerName sn,
+  void regionServerReport(ServerName sn,
       ServerLoad sl) throws YouAreDeadException {
     checkIsDead(sn, "REPORT");
     if (null == this.onlineServers.replace(sn, sl)) {
@@ -612,7 +614,12 @@ public class ServerManager {
       return;
     }
 
-    master.getAssignmentManager().submitServerCrash(serverName, true);
+    boolean carryingMeta = 
master.getAssignmentManager().isCarryingMeta(serverName);
+    ProcedureExecutor<MasterProcedureEnv> procExec = 
this.master.getMasterProcedureExecutor();
+    procExec.submitProcedure(new ServerCrashProcedure(
+      procExec.getEnvironment(), serverName, true, carryingMeta));
+    LOG.debug("Added=" + serverName +
+      " to dead servers, submitted shutdown handler to be executed meta=" + 
carryingMeta);
 
     // Tell our listeners that a server was removed
     if (!this.listeners.isEmpty()) {
@@ -622,37 +629,6 @@ public class ServerManager {
     }
   }
 
-  /**
-   * Sends an MERGE REGIONS RPC to the specified server to merge the specified
-   * regions.
-   * <p>
-   * A region server could reject the close request because it either does not
-   * have the specified region.
-   * @param server server to merge regions
-   * @param region_a region to merge
-   * @param region_b region to merge
-   * @param forcible true if do a compulsory merge, otherwise we will only 
merge
-   *          two adjacent regions
-   * @throws IOException
-   */
-  public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
-      HRegionInfo region_b, boolean forcible, final User user) throws 
IOException {
-    if (server == null)
-      throw new NullPointerException("Passed server is null");
-    if (region_a == null || region_b == null)
-      throw new NullPointerException("Passed region is null");
-    AdminService.BlockingInterface admin = getRsAdmin(server);
-    if (admin == null) {
-      throw new IOException("Attempting to send MERGE REGIONS RPC to server "
-          + server.toString() + " for region "
-          + region_a.getRegionNameAsString() + ","
-          + region_b.getRegionNameAsString()
-          + " failed because no RPC connection found to this server");
-    }
-    HBaseRpcController controller = newRpcController();
-    ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible, 
user);
-  }
-
   @VisibleForTesting
   public void moveFromOnlineToDeadServers(final ServerName sn) {
     synchronized (onlineServers) {
@@ -684,7 +660,9 @@ public class ServerManager {
     }
 
     this.deadservers.add(serverName);
-    master.getAssignmentManager().submitServerCrash(serverName, 
shouldSplitWal);
+    ProcedureExecutor<MasterProcedureEnv> procExec = 
this.master.getMasterProcedureExecutor();
+    procExec.submitProcedure(new ServerCrashProcedure(
+      procExec.getEnvironment(), serverName, shouldSplitWal, false));
   }
 
   /**
@@ -770,8 +748,9 @@ public class ServerManager {
       throw new IOException("Attempting to send OPEN RPC to server " + 
server.toString() +
         " failed because no RPC connection found to this server");
     }
-    OpenRegionRequest request =
-        RequestConverter.buildOpenRegionRequest(server, region, favoredNodes, 
false);
+    OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
+      region, favoredNodes,
+      (RecoveryMode.LOG_REPLAY == 
this.master.getMasterWalManager().getLogRecoveryMode()));
     try {
       OpenRegionResponse response = admin.openRegion(null, request);
       return ResponseConverter.getRegionOpeningState(response);
@@ -853,8 +832,8 @@ public class ServerManager {
         " failed because no RPC connection found to this server");
     }
 
-    OpenRegionRequest request =
-        RequestConverter.buildOpenRegionRequest(server, regionOpenInfos, 
false);
+    OpenRegionRequest request = 
RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
+      (RecoveryMode.LOG_REPLAY == 
this.master.getMasterWalManager().getLogRecoveryMode()));
     try {
       OpenRegionResponse response = admin.openRegion(null, request);
       return ResponseConverter.getRegionOpeningStateList(response);
@@ -898,6 +877,30 @@ public class ServerManager {
   }
 
   /**
+   * Sends an CLOSE RPC to the specified server to close the specified region 
for SPLIT.
+   * <p>
+   * A region server could reject the close request because it either does not
+   * have the specified region or the region is being split.
+   * @param server server to close a region
+   * @param regionToClose the info of the region(s) to close
+   * @throws IOException
+   */
+  public boolean sendRegionCloseForSplitOrMerge(
+      final ServerName server,
+      final HRegionInfo... regionToClose) throws IOException {
+    if (server == null) {
+      throw new NullPointerException("Passed server is null");
+    }
+    AdminService.BlockingInterface admin = getRsAdmin(server);
+    if (admin == null) {
+      throw new IOException("Attempting to send CLOSE For Split or Merge RPC 
to server " +
+        server.toString() + " failed because no RPC connection found to this 
server.");
+    }
+    HBaseRpcController controller = newRpcController();
+    return ProtobufUtil.closeRegionForSplitOrMerge(controller, admin, server, 
regionToClose);
+  }
+
+  /**
    * Sends a WARMUP RPC to the specified server to warmup the specified region.
    * <p>
    * A region server could reject the close request because it either does not
@@ -987,7 +990,7 @@ public class ServerManager {
     * @throws IOException
     * @throws RetriesExhaustedException wrapping a ConnectException if failed
     */
-  public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
+  private AdminService.BlockingInterface getRsAdmin(final ServerName sn)
   throws IOException {
     AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
     if (admin == null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 2fc2bbb..7017d29 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -710,7 +710,7 @@ public class SplitLogManager {
         long now = EnvironmentEdgeManager.currentTime();
         if (now > lastLog + 5000) {
           lastLog = now;
-          LOG.info("total=" + tot + ", unassigned=" + unassigned + ", tasks=" 
+ tasks);
+          LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " 
tasks=" + tasks);
         }
       }
       if (resubmitted > 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java
index 4a2c942..7582d42 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java
@@ -313,9 +313,8 @@ public class TableNamespaceManager {
   }
 
   private boolean isTableAssigned() {
-    // TODO: we have a better way now (wait on event)
-    return masterServices.getAssignmentManager()
-        
.getRegionStates().hasTableRegionStates(TableName.NAMESPACE_TABLE_NAME);
+    return !masterServices.getAssignmentManager()
+        
.getRegionStates().getRegionsOfTable(TableName.NAMESPACE_TABLE_NAME).isEmpty();
   }
 
   public void validateTableAndRegionCount(NamespaceDescriptor desc) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
index dfc4321..96ea036 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
@@ -183,9 +183,8 @@ public class TableStateManager {
 
   @Nullable
   protected TableState readMetaState(TableName tableName) throws IOException {
-    if (tableName.equals(TableName.META_TABLE_NAME)) {
+    if (tableName.equals(TableName.META_TABLE_NAME))
       return new TableState(tableName, TableState.State.ENABLED);
-    }
     return MetaTableAccessor.getTableState(master.getConnection(), tableName);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java
new file mode 100644
index 0000000..ccff6f0
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+
+/**
+ * A callable object that invokes the corresponding action that needs to be
+ * taken for unassignment of a region in transition. Implementing as future
+ * callable we are able to act on the timeout asynchronously.
+ */
+@InterfaceAudience.Private
+public class UnAssignCallable implements Callable<Object> {
+  private AssignmentManager assignmentManager;
+
+  private HRegionInfo hri;
+
+  public UnAssignCallable(AssignmentManager assignmentManager, HRegionInfo 
hri) {
+    this.assignmentManager = assignmentManager;
+    this.hri = hri;
+  }
+
+  @Override
+  public Object call() throws Exception {
+    assignmentManager.unassign(hri);
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
deleted file mode 100644
index 42ece16..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.master.assignment;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
-import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
-import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import 
org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AssignRegionStateData;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-
-/**
- * Procedure that describe the assignment of a single region.
- * There can only be one RegionTransitionProcedure per region running at a time
- * since each procedure takes a lock on the region.
- *
- * <p>The Assign starts by pushing the "assign" operation to the 
AssignmentManager
- * and then will go in a "waiting" state.
- * The AM will batch the "assign" requests and ask the Balancer where to put
- * the region (the various policies will be respected: retain, round-robin, 
random).
- * Once the AM and the balancer have found a place for the region the procedure
- * will be resumed and an "open region" request will be placed in the Remote 
Dispatcher
- * queue, and the procedure once again will go in a "waiting state".
- * The Remote Dispatcher will batch the various requests for that server and
- * they will be sent to the RS for execution.
- * The RS will complete the open operation by calling 
master.reportRegionStateTransition().
- * The AM will intercept the transition report, and notify the procedure.
- * The procedure will finish the assignment by publishing to new state on meta
- * or it will retry the assignment.
- *
- * <p>This procedure does not rollback when beyond the first
- * REGION_TRANSITION_QUEUE step; it will press on trying to assign in the face 
of
- * failure. Should we ignore rollback calls to Assign/Unassign then? Or just
- * remove rollback here?
- */
-@InterfaceAudience.Private
-public class AssignProcedure extends RegionTransitionProcedure {
-  private static final Log LOG = LogFactory.getLog(AssignProcedure.class);
-
-  private boolean forceNewPlan = false;
-
-  /**
-   * Gets set as desired target on move, merge, etc., when we want to go to a 
particular server.
-   * We may not be able to respect this request but will try. When it is NOT 
set, then we ask
-   * the balancer to assign. This value is used below in startTransition to 
set regionLocation if
-   * non-null. Setting regionLocation in regionServerNode is how we override 
balancer setting
-   * destination.
-   */
-  protected volatile ServerName targetServer;
-
-  public AssignProcedure() {
-    // Required by the Procedure framework to create the procedure on replay
-    super();
-  }
-
-  public AssignProcedure(final HRegionInfo regionInfo) {
-    this(regionInfo, false);
-  }
-
-  public AssignProcedure(final HRegionInfo regionInfo, final boolean 
forceNewPlan) {
-    super(regionInfo);
-    this.forceNewPlan = forceNewPlan;
-    this.targetServer = null;
-  }
-
-  public AssignProcedure(final HRegionInfo regionInfo, final ServerName 
destinationServer) {
-    super(regionInfo);
-    this.forceNewPlan = false;
-    this.targetServer = destinationServer;
-  }
-
-  @Override
-  public TableOperationType getTableOperationType() {
-    return TableOperationType.REGION_ASSIGN;
-  }
-
-  @Override
-  protected boolean isRollbackSupported(final RegionTransitionState state) {
-    switch (state) {
-      case REGION_TRANSITION_QUEUE:
-        return true;
-      default:
-        return false;
-    }
-  }
-
-  @Override
-  public void serializeStateData(final OutputStream stream) throws IOException 
{
-    final AssignRegionStateData.Builder state = 
AssignRegionStateData.newBuilder()
-        .setTransitionState(getTransitionState())
-        .setRegionInfo(HRegionInfo.convert(getRegionInfo()));
-    if (forceNewPlan) {
-      state.setForceNewPlan(true);
-    }
-    if (this.targetServer != null) {
-      state.setTargetServer(ProtobufUtil.toServerName(this.targetServer));
-    }
-    state.build().writeDelimitedTo(stream);
-  }
-
-  @Override
-  public void deserializeStateData(final InputStream stream) throws 
IOException {
-    final AssignRegionStateData state = 
AssignRegionStateData.parseDelimitedFrom(stream);
-    setTransitionState(state.getTransitionState());
-    setRegionInfo(HRegionInfo.convert(state.getRegionInfo()));
-    forceNewPlan = state.getForceNewPlan();
-    if (state.hasTargetServer()) {
-      this.targetServer = ProtobufUtil.toServerName(state.getTargetServer());
-    }
-  }
-
-  @Override
-  protected boolean startTransition(final MasterProcedureEnv env, final 
RegionStateNode regionNode)
-      throws IOException {
-    // If the region is already open we can't do much...
-    if (regionNode.isInState(State.OPEN) && isServerOnline(env, regionNode)) {
-      LOG.info("Assigned, not reassigning; " + this + "; " + 
regionNode.toShortString());
-      return false;
-    }
-    // If the region is SPLIT, we can't assign it.
-    if (regionNode.isInState(State.SPLIT)) {
-      LOG.info("SPLIT, cannot be assigned; " + this + "; " + 
regionNode.toShortString());
-      return false;
-    }
-
-    // If we haven't started the operation yet, we can abort
-    if (aborted.get() && regionNode.isInState(State.CLOSED, State.OFFLINE)) {
-      if (incrementAndCheckMaxAttempts(env, regionNode)) {
-        regionNode.setState(State.FAILED_OPEN);
-        setFailure(getClass().getSimpleName(),
-          new RetriesExhaustedException("Max attempts exceeded"));
-      } else {
-        setAbortFailure(getClass().getSimpleName(), "Abort requested");
-      }
-      return false;
-    }
-
-    // Send assign (add into assign-pool). Region is now in OFFLINE state. 
Setting offline state
-    // scrubs what was the old region location. Setting a new regionLocation 
here is how we retain
-    // old assignment or specify target server if a move or merge. See
-    // AssignmentManager#processAssignQueue. Otherwise, balancer gives us 
location.
-    ServerName lastRegionLocation = regionNode.offline();
-    boolean retain = false;
-    if (!forceNewPlan) {
-      if (this.targetServer != null) {
-        retain = targetServer.equals(lastRegionLocation);
-        regionNode.setRegionLocation(targetServer);
-      } else {
-        if (lastRegionLocation != null) {
-          // Try and keep the location we had before we offlined.
-          retain = true;
-          regionNode.setRegionLocation(lastRegionLocation);
-        }
-      }
-    }
-    LOG.info("Start " + this + "; " + regionNode.toShortString() +
-        "; forceNewPlan=" + this.forceNewPlan +
-        ", retain=" + retain);
-    env.getAssignmentManager().queueAssign(regionNode);
-    return true;
-  }
-
-  @Override
-  protected boolean updateTransition(final MasterProcedureEnv env, final 
RegionStateNode regionNode)
-  throws IOException, ProcedureSuspendedException {
-    // TODO: crash if destinationServer is specified and not online
-    // which is also the case when the balancer provided us with a different 
location.
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Update " + this + "; " + regionNode.toShortString());
-    }
-    if (regionNode.getRegionLocation() == null) {
-      setTransitionState(RegionTransitionState.REGION_TRANSITION_QUEUE);
-      return true;
-    }
-
-    if (!isServerOnline(env, regionNode)) {
-      // TODO: is this correct? should we wait the chore/ssh?
-      LOG.info("Server not online, re-queuing " + this + "; " + 
regionNode.toShortString());
-      setTransitionState(RegionTransitionState.REGION_TRANSITION_QUEUE);
-      return true;
-    }
-
-    if 
(env.getAssignmentManager().waitServerReportEvent(regionNode.getRegionLocation(),
 this)) {
-      LOG.info("Early suspend! " + this + "; " + regionNode.toShortString());
-      throw new ProcedureSuspendedException();
-    }
-
-    if (regionNode.isInState(State.OPEN)) {
-      LOG.info("Already assigned: " + this + "; " + 
regionNode.toShortString());
-      return false;
-    }
-
-    // Transition regionNode State. Set it to OPENING. Update hbase:meta, and 
add
-    // region to list of regions on the target regionserver. Need to UNDO if 
failure!
-    env.getAssignmentManager().markRegionAsOpening(regionNode);
-
-    // TODO: Requires a migration to be open by the RS?
-    // regionNode.getFormatVersion()
-
-    if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) {
-      // Failed the dispatch BUT addToRemoteDispatcher internally does
-      // cleanup on failure -- even the undoing of markRegionAsOpening above --
-      // so nothing more to do here; in fact we need to get out of here
-      // fast since we've been put back on the scheduler.
-    }
-
-    // We always return true, even if we fail dispatch because 
addToRemoteDispatcher
-    // failure processing sets state back to REGION_TRANSITION_QUEUE so we try 
again;
-    // i.e. return true to keep the Procedure running; it has been reset to 
startover.
-    return true;
-  }
-
-  @Override
-  protected void finishTransition(final MasterProcedureEnv env, final 
RegionStateNode regionNode)
-      throws IOException {
-    env.getAssignmentManager().markRegionAsOpened(regionNode);
-    // This success may have been after we failed open a few times. Be sure to 
cleanup any
-    // failed open references. See #incrementAndCheckMaxAttempts and where it 
is called.
-    
env.getAssignmentManager().getRegionStates().removeFromFailedOpen(regionNode.getRegionInfo());
-  }
-
-  @Override
-  protected void reportTransition(final MasterProcedureEnv env, final 
RegionStateNode regionNode,
-      final TransitionCode code, final long openSeqNum) throws 
UnexpectedStateException {
-    switch (code) {
-      case OPENED:
-        if (openSeqNum < 0) {
-          throw new UnexpectedStateException("Received report unexpected " + 
code +
-              " transition openSeqNum=" + openSeqNum + ", " + regionNode);
-        }
-        if (openSeqNum < regionNode.getOpenSeqNum()) {
-          LOG.warn("Skipping update of open seqnum with " + openSeqNum +
-              " because current seqnum=" + regionNode.getOpenSeqNum());
-        }
-        regionNode.setOpenSeqNum(openSeqNum);
-        // Leave the state here as OPENING for now. We set it to OPEN in
-        // REGION_TRANSITION_FINISH section where we do a bunch of checks.
-        // regionNode.setState(RegionState.State.OPEN, 
RegionState.State.OPENING);
-        setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
-        break;
-      case FAILED_OPEN:
-        handleFailure(env, regionNode);
-        break;
-      default:
-        throw new UnexpectedStateException("Received report unexpected " + 
code +
-            " transition openSeqNum=" + openSeqNum + ", " + 
regionNode.toShortString() +
-            ", " + this + ", expected OPENED or FAILED_OPEN.");
-    }
-  }
-
-  /**
-   * Called when dispatch or subsequent OPEN request fail. Can be run by the
-   * inline dispatch call or later by the ServerCrashProcedure. Our state is
-   * generally OPENING. Cleanup and reset to OFFLINE and put our Procedure
-   * State back to REGION_TRANSITION_QUEUE so the Assign starts over.
-   */
-  private void handleFailure(final MasterProcedureEnv env, final 
RegionStateNode regionNode) {
-    if (incrementAndCheckMaxAttempts(env, regionNode)) {
-      aborted.set(true);
-    }
-    this.forceNewPlan = true;
-    this.targetServer = null;
-    regionNode.offline();
-    // We were moved to OPENING state before dispatch. Undo. It is safe to call
-    // this method because it checks for OPENING first.
-    env.getAssignmentManager().undoRegionAsOpening(regionNode);
-    setTransitionState(RegionTransitionState.REGION_TRANSITION_QUEUE);
-  }
-
-  private boolean incrementAndCheckMaxAttempts(final MasterProcedureEnv env,
-      final RegionStateNode regionNode) {
-    final int retries = env.getAssignmentManager().getRegionStates().
-        addToFailedOpen(regionNode).incrementAndGetRetries();
-    int max = env.getAssignmentManager().getAssignMaxAttempts();
-    LOG.info("Retry=" + retries + " of max=" + max + "; " +
-        this + "; " + regionNode.toShortString());
-    return retries >= max;
-  }
-
-  @Override
-  public RemoteOperation remoteCallBuild(final MasterProcedureEnv env, final 
ServerName serverName) {
-    assert serverName.equals(getRegionState(env).getRegionLocation());
-    return new RegionOpenOperation(this, getRegionInfo(),
-        env.getAssignmentManager().getFavoredNodes(getRegionInfo()), false);
-  }
-
-  @Override
-  protected void remoteCallFailed(final MasterProcedureEnv env, final 
RegionStateNode regionNode,
-      final IOException exception) {
-    handleFailure(env, regionNode);
-  }
-
-  @Override
-  public void toStringClassDetails(StringBuilder sb) {
-    super.toStringClassDetails(sb);
-    if (this.targetServer != null) sb.append(", 
target=").append(this.targetServer);
-  }
-
-  @Override
-  public ServerName getServer(final MasterProcedureEnv env) {
-    RegionStateNode node =
-        
env.getAssignmentManager().getRegionStates().getRegionNode(this.getRegionInfo());
-    if (node == null) return null;
-    return node.getRegionLocation();
-  }
-}
\ No newline at end of file

Reply via email to