http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java new file mode 100644 index 0000000..69ebd97 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -0,0 +1,3053 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CoordinatedStateException; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.RegionStateListener; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MasterSwitchType; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.favored.FavoredNodesManager; +import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; +import org.apache.hadoop.hbase.ipc.FailedServerException; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.quotas.QuotaExceededException; +import org.apache.hadoop.hbase.regionserver.RegionOpeningState; +import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.KeyLocker; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.KeeperException; + +/** + * Manages and performs region assignment. + * Related communications with regionserver are all done over RPC. + */ +@InterfaceAudience.Private +public class AssignmentManager { + private static final Log LOG = LogFactory.getLog(AssignmentManager.class); + + protected final MasterServices server; + + private ServerManager serverManager; + + private boolean shouldAssignRegionsWithFavoredNodes; + + private LoadBalancer balancer; + + private final MetricsAssignmentManager metricsAssignmentManager; + + private AtomicInteger numRegionsOpened = new AtomicInteger(0); + + final private KeyLocker<String> locker = new KeyLocker<>(); + + Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>()); + + /** + * Map of regions to reopen after the schema of a table is changed. Key - + * encoded region name, value - HRegionInfo + */ + private final Map <String, HRegionInfo> regionsToReopen; + + /* + * Maximum times we recurse an assignment/unassignment. + * See below in {@link #assign()} and {@link #unassign()}. + */ + private final int maximumAttempts; + + /** + * The sleep time for which the assignment will wait before retrying in case of + * hbase:meta assignment failure due to lack of availability of region plan or bad region plan + */ + private final long sleepTimeBeforeRetryingMetaAssignment; + + /** Plans for region movement. Key is the encoded version of a region name*/ + // TODO: When do plans get cleaned out? Ever? In server open and in server + // shutdown processing -- St.Ack + // All access to this Map must be synchronized. + final NavigableMap<String, RegionPlan> regionPlans = new TreeMap<>(); + + private final TableStateManager tableStateManager; + + private final ExecutorService executorService; + + private java.util.concurrent.ExecutorService threadPoolExecutorService; + private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; + + private final RegionStates regionStates; + + // The threshold to use bulk assigning. Using bulk assignment + // only if assigning at least this many regions to at least this + // many servers. If assigning fewer regions to fewer servers, + // bulk assigning may be not as efficient. + private final int bulkAssignThresholdRegions; + private final int bulkAssignThresholdServers; + private final int bulkPerRegionOpenTimeGuesstimate; + + // Should bulk assignment wait till all regions are assigned, + // or it is timed out? This is useful to measure bulk assignment + // performance, but not needed in most use cases. + private final boolean bulkAssignWaitTillAllAssigned; + + /** + * Indicator that AssignmentManager has recovered the region states so + * that ServerShutdownHandler can be fully enabled and re-assign regions + * of dead servers. So that when re-assignment happens, AssignmentManager + * has proper region states. + * + * Protected to ease testing. + */ + protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false); + + /** + * A map to track the count a region fails to open in a row. + * So that we don't try to open a region forever if the failure is + * unrecoverable. We don't put this information in region states + * because we don't expect this to happen frequently; we don't + * want to copy this information over during each state transition either. + */ + private final ConcurrentHashMap<String, AtomicInteger> failedOpenTracker = new ConcurrentHashMap<>(); + + // In case not using ZK for region assignment, region states + // are persisted in meta with a state store + private final RegionStateStore regionStateStore; + + /** + * For testing only! Set to true to skip handling of split. + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") + public static boolean TEST_SKIP_SPLIT_HANDLING = false; + + /** Listeners that are called on assignment events. */ + private List<AssignmentListener> listeners = new CopyOnWriteArrayList<>(); + + private RegionStateListener regionStateListener; + + private RetryCounter.BackoffPolicy backoffPolicy; + private RetryCounter.RetryConfig retryConfig; + /** + * Constructs a new assignment manager. + * + * @param server instance of HMaster this AM running inside + * @param serverManager serverManager for associated HMaster + * @param balancer implementation of {@link LoadBalancer} + * @param service Executor service + * @param metricsMaster metrics manager + * @throws IOException + */ + public AssignmentManager(MasterServices server, ServerManager serverManager, + final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster, + final TableStateManager tableStateManager) + throws IOException { + this.server = server; + this.serverManager = serverManager; + this.executorService = service; + this.regionStateStore = new RegionStateStore(server); + this.regionsToReopen = Collections.synchronizedMap + (new HashMap<String, HRegionInfo> ()); + Configuration conf = server.getConfiguration(); + + this.tableStateManager = tableStateManager; + + // This is the max attempts, not retries, so it should be at least 1. + this.maximumAttempts = Math.max(1, + this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10)); + this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong( + "hbase.meta.assignment.retry.sleeptime", 1000l); + this.balancer = balancer; + // Only read favored nodes if using the favored nodes load balancer. + this.shouldAssignRegionsWithFavoredNodes = this.balancer instanceof FavoredNodesPromoter; + int maxThreads = conf.getInt("hbase.assignment.threads.max", 30); + + this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool( + maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM.")); + + this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, + Threads.newDaemonThreadFactory("AM.Scheduler")); + + this.regionStates = new RegionStates( + server, tableStateManager, serverManager, regionStateStore); + + this.bulkAssignWaitTillAllAssigned = + conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false); + this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7); + this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3); + this.bulkPerRegionOpenTimeGuesstimate = + conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000); + + this.metricsAssignmentManager = new MetricsAssignmentManager(); + + // Configurations for retrying opening a region on receiving a FAILED_OPEN + this.retryConfig = new RetryCounter.RetryConfig(); + this.retryConfig.setSleepInterval(conf.getLong("hbase.assignment.retry.sleep.initial", 0l)); + // Set the max time limit to the initial sleep interval so we use a constant time sleep strategy + // if the user does not set a max sleep limit + this.retryConfig.setMaxSleepTime(conf.getLong("hbase.assignment.retry.sleep.max", + retryConfig.getSleepInterval())); + this.backoffPolicy = getBackoffPolicy(); + } + + /** + * Returns the backoff policy used for Failed Region Open retries + * @return the backoff policy used for Failed Region Open retries + */ + RetryCounter.BackoffPolicy getBackoffPolicy() { + return new RetryCounter.ExponentialBackoffPolicyWithLimit(); + } + + MetricsAssignmentManager getAssignmentManagerMetrics() { + return this.metricsAssignmentManager; + } + + /** + * Add the listener to the notification list. + * @param listener The AssignmentListener to register + */ + public void registerListener(final AssignmentListener listener) { + this.listeners.add(listener); + } + + /** + * Remove the listener from the notification list. + * @param listener The AssignmentListener to unregister + */ + public boolean unregisterListener(final AssignmentListener listener) { + return this.listeners.remove(listener); + } + + /** + * @return Instance of ZKTableStateManager. + */ + public TableStateManager getTableStateManager() { + // These are 'expensive' to make involving trip to zk ensemble so allow + // sharing. + return this.tableStateManager; + } + + /** + * This SHOULD not be public. It is public now + * because of some unit tests. + * + * TODO: make it package private and keep RegionStates in the master package + */ + public RegionStates getRegionStates() { + return regionStates; + } + + /** + * Used in some tests to mock up region state in meta + */ + @VisibleForTesting + RegionStateStore getRegionStateStore() { + return regionStateStore; + } + + public RegionPlan getRegionReopenPlan(HRegionInfo hri) { + return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri)); + } + + /** + * Add a regionPlan for the specified region. + * @param encodedName + * @param plan + */ + public void addPlan(String encodedName, RegionPlan plan) { + synchronized (regionPlans) { + regionPlans.put(encodedName, plan); + } + } + + /** + * Add a map of region plans. + */ + public void addPlans(Map<String, RegionPlan> plans) { + synchronized (regionPlans) { + regionPlans.putAll(plans); + } + } + + /** + * Set the list of regions that will be reopened + * because of an update in table schema + * + * @param regions + * list of regions that should be tracked for reopen + */ + public void setRegionsToReopen(List <HRegionInfo> regions) { + for(HRegionInfo hri : regions) { + regionsToReopen.put(hri.getEncodedName(), hri); + } + } + + /** + * Used by the client to identify if all regions have the schema updates + * + * @param tableName + * @return Pair indicating the status of the alter command + * @throws IOException + */ + public Pair<Integer, Integer> getReopenStatus(TableName tableName) + throws IOException { + List<HRegionInfo> hris; + if (TableName.META_TABLE_NAME.equals(tableName)) { + hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper()); + } else { + hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true); + } + + Integer pending = 0; + for (HRegionInfo hri : hris) { + String name = hri.getEncodedName(); + // no lock concurrent access ok: sequential consistency respected. + if (regionsToReopen.containsKey(name) + || regionStates.isRegionInTransition(name)) { + pending++; + } + } + return new Pair<>(pending, hris.size()); + } + + /** + * Used by ServerShutdownHandler to make sure AssignmentManager has completed + * the failover cleanup before re-assigning regions of dead servers. So that + * when re-assignment happens, AssignmentManager has proper region states. + */ + public boolean isFailoverCleanupDone() { + return failoverCleanupDone.get(); + } + + /** + * To avoid racing with AM, external entities may need to lock a region, + * for example, when SSH checks what regions to skip re-assigning. + */ + public Lock acquireRegionLock(final String encodedName) { + return locker.acquireLock(encodedName); + } + + /** + * Now, failover cleanup is completed. Notify server manager to + * process queued up dead servers processing, if any. + */ + void failoverCleanupDone() { + failoverCleanupDone.set(true); + serverManager.processQueuedDeadServers(); + } + + /** + * Called on startup. + * Figures whether a fresh cluster start of we are joining extant running cluster. + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + * @throws CoordinatedStateException + */ + void joinCluster() + throws IOException, KeeperException, InterruptedException, CoordinatedStateException { + long startTime = System.currentTimeMillis(); + // Concurrency note: In the below the accesses on regionsInTransition are + // outside of a synchronization block where usually all accesses to RIT are + // synchronized. The presumption is that in this case it is safe since this + // method is being played by a single thread on startup. + + // TODO: Regions that have a null location and are not in regionsInTransitions + // need to be handled. + + // Scan hbase:meta to build list of existing regions, servers, and assignment + // Returns servers who have not checked in (assumed dead) that some regions + // were assigned to (according to the meta) + Set<ServerName> deadServers = rebuildUserRegions(); + + // This method will assign all user regions if a clean server startup or + // it will reconstruct master state and cleanup any leftovers from previous master process. + boolean failover = processDeadServersAndRegionsInTransition(deadServers); + + LOG.info("Joined the cluster in " + (System.currentTimeMillis() + - startTime) + "ms, failover=" + failover); + } + + /** + * Process all regions that are in transition in zookeeper and also + * processes the list of dead servers. + * Used by master joining an cluster. If we figure this is a clean cluster + * startup, will assign all user regions. + * @param deadServers Set of servers that are offline probably legitimately that were carrying + * regions according to a scan of hbase:meta. Can be null. + * @throws IOException + * @throws InterruptedException + */ + boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers) + throws KeeperException, IOException, InterruptedException, CoordinatedStateException { + // TODO Needed? List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode); + boolean failover = !serverManager.getDeadServers().isEmpty(); + if (failover) { + // This may not be a failover actually, especially if meta is on this master. + if (LOG.isDebugEnabled()) { + LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers()); + } + // Check if there are any regions on these servers + failover = false; + for (ServerName serverName : serverManager.getDeadServers().copyServerNames()) { + if (regionStates.getRegionAssignments().values().contains(serverName)) { + LOG.debug("Found regions on dead server: " + serverName); + failover = true; + break; + } + } + } + Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet(); + if (!failover) { + // If any one region except meta is assigned, it's a failover. + for (Map.Entry<HRegionInfo, ServerName> en: + regionStates.getRegionAssignments().entrySet()) { + HRegionInfo hri = en.getKey(); + if (!hri.isMetaTable() + && onlineServers.contains(en.getValue())) { + LOG.debug("Found region " + hri + " out on cluster"); + failover = true; + break; + } + } + } + if (!failover) { + // If any region except meta is in transition on a live server, it's a failover. + Set<RegionState> regionsInTransition = regionStates.getRegionsInTransition(); + if (!regionsInTransition.isEmpty()) { + for (RegionState regionState: regionsInTransition) { + ServerName serverName = regionState.getServerName(); + if (!regionState.getRegion().isMetaRegion() + && serverName != null && onlineServers.contains(serverName)) { + LOG.debug("Found " + regionState + " for region " + + regionState.getRegion().getRegionNameAsString() + " for server " + + serverName + "in RITs"); + failover = true; + break; + } + } + } + } + if (!failover) { + // If we get here, we have a full cluster restart. It is a failover only + // if there are some WALs are not split yet. For meta WALs, they should have + // been split already, if any. We can walk through those queued dead servers, + // if they don't have any WALs, this restart should be considered as a clean one + Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet(); + if (!queuedDeadServers.isEmpty()) { + Configuration conf = server.getConfiguration(); + Path walRootDir = FSUtils.getWALRootDir(conf); + FileSystem walFs = FSUtils.getWALFileSystem(conf); + for (ServerName serverName: queuedDeadServers) { + // In the case of a clean exit, the shutdown handler would have presplit any WALs and + // removed empty directories. + Path walDir = new Path(walRootDir, + AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); + Path splitDir = walDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); + if (checkWals(walFs, walDir) || checkWals(walFs, splitDir)) { + LOG.debug("Found queued dead server " + serverName); + failover = true; + break; + } + } + if (!failover) { + // We figured that it's not a failover, so no need to + // work on these re-queued dead servers any more. + LOG.info("AM figured that it's not a failover and cleaned up " + + queuedDeadServers.size() + " queued dead servers"); + serverManager.removeRequeuedDeadServers(); + } + } + } + + Set<TableName> disabledOrDisablingOrEnabling = null; + Map<HRegionInfo, ServerName> allRegions = null; + + if (!failover) { + disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates( + TableState.State.DISABLED, TableState.State.DISABLING, + TableState.State.ENABLING); + + // Clean re/start, mark all user regions closed before reassignment + allRegions = regionStates.closeAllUserRegions( + disabledOrDisablingOrEnabling); + } + + // Now region states are restored + regionStateStore.start(); + + if (failover) { + if (deadServers != null && !deadServers.isEmpty()) { + for (ServerName serverName: deadServers) { + if (!serverManager.isServerDead(serverName)) { + serverManager.expireServer(serverName); // Let SSH do region re-assign + } + } + } + processRegionsInTransition(regionStates.getRegionsInTransition()); + } + + // Now we can safely claim failover cleanup completed and enable + // ServerShutdownHandler for further processing. The nodes (below) + // in transition, if any, are for regions not related to those + // dead servers at all, and can be done in parallel to SSH. + failoverCleanupDone(); + if (!failover) { + // Fresh cluster startup. + LOG.info("Clean cluster startup. Don't reassign user regions"); + assignAllUserRegions(allRegions); + } else { + LOG.info("Failover! Reassign user regions"); + } + // unassign replicas of the split parents and the merged regions + // the daughter replicas are opened in assignAllUserRegions if it was + // not already opened. + for (HRegionInfo h : replicasToClose) { + unassign(h); + } + replicasToClose.clear(); + return failover; + } + + private boolean checkWals(FileSystem fs, Path dir) throws IOException { + if (!fs.exists(dir)) { + LOG.debug(dir + " doesn't exist"); + return false; + } + if (!fs.getFileStatus(dir).isDirectory()) { + LOG.warn(dir + " is not a directory"); + return false; + } + FileStatus[] files = FSUtils.listStatus(fs, dir); + if (files == null || files.length == 0) { + LOG.debug(dir + " has no files"); + return false; + } + for (int i = 0; i < files.length; i++) { + if (files[i].isFile() && files[i].getLen() > 0) { + LOG.debug(dir + " has a non-empty file: " + files[i].getPath()); + return true; + } else if (files[i].isDirectory() && checkWals(fs, files[i].getPath())) { + LOG.debug(dir + " is a directory and has a non-empty file: " + files[i].getPath()); + return true; + } + } + LOG.debug("Found 0 non-empty wal files for :" + dir); + return false; + } + + /** + * When a region is closed, it should be removed from the regionsToReopen + * @param hri HRegionInfo of the region which was closed + */ + public void removeClosedRegion(HRegionInfo hri) { + if (regionsToReopen.remove(hri.getEncodedName()) != null) { + LOG.debug("Removed region from reopening regions because it was closed"); + } + } + + void processFavoredNodesForDaughters(HRegionInfo parent, + HRegionInfo regionA, HRegionInfo regionB) throws IOException { + if (shouldAssignFavoredNodes(parent)) { + List<ServerName> onlineServers = this.serverManager.getOnlineServersList(); + ((FavoredNodesPromoter) this.balancer). + generateFavoredNodesForDaughter(onlineServers, parent, regionA, regionB); + } + } + + void processFavoredNodesForMerge(HRegionInfo merged, HRegionInfo regionA, HRegionInfo regionB) + throws IOException { + if (shouldAssignFavoredNodes(merged)) { + ((FavoredNodesPromoter)this.balancer). + generateFavoredNodesForMergedRegion(merged, regionA, regionB); + } + } + + /* + * Favored nodes should be applied only when FavoredNodes balancer is configured and the region + * belongs to a non-system table. + */ + private boolean shouldAssignFavoredNodes(HRegionInfo region) { + return this.shouldAssignRegionsWithFavoredNodes + && FavoredNodesManager.isFavoredNodeApplicable(region); + } + + /** + * Marks the region as online. Removes it from regions in transition and + * updates the in-memory assignment information. + * <p> + * Used when a region has been successfully opened on a region server. + * @param regionInfo + * @param sn + */ + void regionOnline(HRegionInfo regionInfo, ServerName sn) { + regionOnline(regionInfo, sn, HConstants.NO_SEQNUM); + } + + void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) { + numRegionsOpened.incrementAndGet(); + regionStates.regionOnline(regionInfo, sn, openSeqNum); + + // Remove plan if one. + clearRegionPlan(regionInfo); + balancer.regionOnline(regionInfo, sn); + + // Tell our listeners that a region was opened + sendRegionOpenedNotification(regionInfo, sn); + } + + /** + * Marks the region as offline. Removes it from regions in transition and + * removes in-memory assignment information. + * <p> + * Used when a region has been closed and should remain closed. + * @param regionInfo + */ + public void regionOffline(final HRegionInfo regionInfo) { + regionOffline(regionInfo, null); + } + + public void offlineDisabledRegion(HRegionInfo regionInfo) { + replicasToClose.remove(regionInfo); + regionOffline(regionInfo); + } + + // Assignment methods + + /** + * Assigns the specified region. + * <p> + * If a RegionPlan is available with a valid destination then it will be used + * to determine what server region is assigned to. If no RegionPlan is + * available, region will be assigned to a random available server. + * <p> + * Updates the RegionState and sends the OPEN RPC. + * <p> + * This will only succeed if the region is in transition and in a CLOSED or + * OFFLINE state or not in transition, and of course, the + * chosen server is up and running (It may have just crashed!). + * + * @param region server to be assigned + */ + public void assign(HRegionInfo region) { + assign(region, false); + } + + /** + * Use care with forceNewPlan. It could cause double assignment. + */ + public void assign(HRegionInfo region, boolean forceNewPlan) { + if (isDisabledorDisablingRegionInRIT(region)) { + return; + } + String encodedName = region.getEncodedName(); + Lock lock = locker.acquireLock(encodedName); + try { + RegionState state = forceRegionStateToOffline(region, forceNewPlan); + if (state != null) { + if (regionStates.wasRegionOnDeadServer(encodedName)) { + LOG.info("Skip assigning " + region.getRegionNameAsString() + + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName) + + " is dead but not processed yet"); + return; + } + assign(state, forceNewPlan); + } + } finally { + lock.unlock(); + } + } + + /** + * Bulk assign regions to <code>destination</code>. + * @param destination + * @param regions Regions to assign. + * @return true if successful + */ + boolean assign(final ServerName destination, final List<HRegionInfo> regions) + throws InterruptedException { + long startTime = EnvironmentEdgeManager.currentTime(); + try { + int regionCount = regions.size(); + if (regionCount == 0) { + return true; + } + LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString()); + Set<String> encodedNames = new HashSet<>(regionCount); + for (HRegionInfo region : regions) { + encodedNames.add(region.getEncodedName()); + } + + List<HRegionInfo> failedToOpenRegions = new ArrayList<>(); + Map<String, Lock> locks = locker.acquireLocks(encodedNames); + try { + Map<String, RegionPlan> plans = new HashMap<>(regionCount); + List<RegionState> states = new ArrayList<>(regionCount); + for (HRegionInfo region : regions) { + String encodedName = region.getEncodedName(); + if (!isDisabledorDisablingRegionInRIT(region)) { + RegionState state = forceRegionStateToOffline(region, false); + boolean onDeadServer = false; + if (state != null) { + if (regionStates.wasRegionOnDeadServer(encodedName)) { + LOG.info("Skip assigning " + region.getRegionNameAsString() + + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName) + + " is dead but not processed yet"); + onDeadServer = true; + } else { + RegionPlan plan = new RegionPlan(region, state.getServerName(), destination); + plans.put(encodedName, plan); + states.add(state); + continue; + } + } + // Reassign if the region wasn't on a dead server + if (!onDeadServer) { + LOG.info("failed to force region state to offline, " + + "will reassign later: " + region); + failedToOpenRegions.add(region); // assign individually later + } + } + // Release the lock, this region is excluded from bulk assign because + // we can't update its state, or set its znode to offline. + Lock lock = locks.remove(encodedName); + lock.unlock(); + } + + if (server.isStopped()) { + return false; + } + + // Add region plans, so we can updateTimers when one region is opened so + // that unnecessary timeout on RIT is reduced. + this.addPlans(plans); + + List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos = new ArrayList<>(states.size()); + for (RegionState state: states) { + HRegionInfo region = state.getRegion(); + regionStates.updateRegionState( + region, State.PENDING_OPEN, destination); + List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST; + if (shouldAssignFavoredNodes(region)) { + favoredNodes = server.getFavoredNodesManager().getFavoredNodesWithDNPort(region); + } + regionOpenInfos.add(new Pair<>(region, favoredNodes)); + } + + // Move on to open regions. + try { + // Send OPEN RPC. If it fails on a IOE or RemoteException, + // regions will be assigned individually. + Configuration conf = server.getConfiguration(); + long maxWaitTime = System.currentTimeMillis() + + conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000); + for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) { + try { + List<RegionOpeningState> regionOpeningStateList = serverManager + .sendRegionOpen(destination, regionOpenInfos); + for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) { + RegionOpeningState openingState = regionOpeningStateList.get(k); + if (openingState != RegionOpeningState.OPENED) { + HRegionInfo region = regionOpenInfos.get(k).getFirst(); + LOG.info("Got opening state " + openingState + + ", will reassign later: " + region); + // Failed opening this region, reassign it later + forceRegionStateToOffline(region, true); + failedToOpenRegions.add(region); + } + } + break; + } catch (IOException e) { + if (e instanceof RemoteException) { + e = ((RemoteException)e).unwrapRemoteException(); + } + if (e instanceof RegionServerStoppedException) { + LOG.warn("The region server was shut down, ", e); + // No need to retry, the region server is a goner. + return false; + } else if (e instanceof ServerNotRunningYetException) { + long now = System.currentTimeMillis(); + if (now < maxWaitTime) { + if (LOG.isDebugEnabled()) { + LOG.debug("Server is not yet up; waiting up to " + + (maxWaitTime - now) + "ms", e); + } + Thread.sleep(100); + i--; // reset the try count + continue; + } + } else if (e instanceof java.net.SocketTimeoutException + && this.serverManager.isServerOnline(destination)) { + // In case socket is timed out and the region server is still online, + // the openRegion RPC could have been accepted by the server and + // just the response didn't go through. So we will retry to + // open the region on the same server. + if (LOG.isDebugEnabled()) { + LOG.debug("Bulk assigner openRegion() to " + destination + + " has timed out, but the regions might" + + " already be opened on it.", e); + } + // wait and reset the re-try count, server might be just busy. + Thread.sleep(100); + i--; + continue; + } else if (e instanceof FailedServerException && i < maximumAttempts) { + // In case the server is in the failed server list, no point to + // retry too soon. Retry after the failed_server_expiry time + long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, + RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); + if (LOG.isDebugEnabled()) { + LOG.debug(destination + " is on failed server list; waiting " + + sleepTime + "ms", e); + } + Thread.sleep(sleepTime); + continue; + } + throw e; + } + } + } catch (IOException e) { + // Can be a socket timeout, EOF, NoRouteToHost, etc + LOG.info("Unable to communicate with " + destination + + " in order to assign regions, ", e); + for (RegionState state: states) { + HRegionInfo region = state.getRegion(); + forceRegionStateToOffline(region, true); + } + return false; + } + } finally { + for (Lock lock : locks.values()) { + lock.unlock(); + } + } + + if (!failedToOpenRegions.isEmpty()) { + for (HRegionInfo region : failedToOpenRegions) { + if (!regionStates.isRegionOnline(region)) { + invokeAssign(region); + } + } + } + + // wait for assignment completion + ArrayList<HRegionInfo> userRegionSet = new ArrayList<>(regions.size()); + for (HRegionInfo region: regions) { + if (!region.getTable().isSystemTable()) { + userRegionSet.add(region); + } + } + if (!waitForAssignment(userRegionSet, true, userRegionSet.size(), + System.currentTimeMillis())) { + LOG.debug("some user regions are still in transition: " + userRegionSet); + } + LOG.debug("Bulk assigning done for " + destination); + return true; + } finally { + metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime); + } + } + + /** + * Send CLOSE RPC if the server is online, otherwise, offline the region. + * + * The RPC will be sent only to the region sever found in the region state + * if it is passed in, otherwise, to the src server specified. If region + * state is not specified, we don't update region state at all, instead + * we just send the RPC call. This is useful for some cleanup without + * messing around the region states (see handleRegion, on region opened + * on an unexpected server scenario, for an example) + */ + private void unassign(final HRegionInfo region, + final ServerName server, final ServerName dest) { + for (int i = 1; i <= this.maximumAttempts; i++) { + if (this.server.isStopped() || this.server.isAborted()) { + LOG.debug("Server stopped/aborted; skipping unassign of " + region); + return; + } + if (!serverManager.isServerOnline(server)) { + LOG.debug("Offline " + region.getRegionNameAsString() + + ", no need to unassign since it's on a dead server: " + server); + regionStates.updateRegionState(region, State.OFFLINE); + return; + } + try { + // Send CLOSE RPC + if (serverManager.sendRegionClose(server, region, dest)) { + LOG.debug("Sent CLOSE to " + server + " for region " + + region.getRegionNameAsString()); + return; + } + // This never happens. Currently regionserver close always return true. + // Todo; this can now happen (0.96) if there is an exception in a coprocessor + LOG.warn("Server " + server + " region CLOSE RPC returned false for " + + region.getRegionNameAsString()); + } catch (Throwable t) { + long sleepTime = 0; + Configuration conf = this.server.getConfiguration(); + if (t instanceof RemoteException) { + t = ((RemoteException)t).unwrapRemoteException(); + } + if (t instanceof RegionServerAbortedException + || t instanceof RegionServerStoppedException + || t instanceof ServerNotRunningYetException) { + // RS is aborting, we cannot offline the region since the region may need to do WAL + // recovery. Until we see the RS expiration, we should retry. + sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, + RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); + + } else if (t instanceof NotServingRegionException) { + LOG.debug("Offline " + region.getRegionNameAsString() + + ", it's not any more on " + server, t); + regionStates.updateRegionState(region, State.OFFLINE); + return; + } else if (t instanceof FailedServerException && i < maximumAttempts) { + // In case the server is in the failed server list, no point to + // retry too soon. Retry after the failed_server_expiry time + sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, + RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); + if (LOG.isDebugEnabled()) { + LOG.debug(server + " is on failed server list; waiting " + sleepTime + "ms", t); + } + } + try { + if (sleepTime > 0) { + Thread.sleep(sleepTime); + } + } catch (InterruptedException ie) { + LOG.warn("Interrupted unassign " + region.getRegionNameAsString(), ie); + Thread.currentThread().interrupt(); + regionStates.updateRegionState(region, State.FAILED_CLOSE); + return; + } + LOG.info("Server " + server + " returned " + t + " for " + + region.getRegionNameAsString() + ", try=" + i + + " of " + this.maximumAttempts, t); + } + } + // Run out of attempts + regionStates.updateRegionState(region, State.FAILED_CLOSE); + } + + /** + * Set region to OFFLINE unless it is opening and forceNewPlan is false. + */ + private RegionState forceRegionStateToOffline( + final HRegionInfo region, final boolean forceNewPlan) { + RegionState state = regionStates.getRegionState(region); + if (state == null) { + LOG.warn("Assigning but not in region states: " + region); + state = regionStates.createRegionState(region); + } + + if (forceNewPlan && LOG.isDebugEnabled()) { + LOG.debug("Force region state offline " + state); + } + + switch (state.getState()) { + case OPEN: + case OPENING: + case PENDING_OPEN: + case CLOSING: + case PENDING_CLOSE: + if (!forceNewPlan) { + LOG.debug("Skip assigning " + + region + ", it is already " + state); + return null; + } + case FAILED_CLOSE: + case FAILED_OPEN: + regionStates.updateRegionState(region, State.PENDING_CLOSE); + unassign(region, state.getServerName(), null); + state = regionStates.getRegionState(region); + if (!state.isOffline() && !state.isClosed()) { + // If the region isn't offline, we can't re-assign + // it now. It will be assigned automatically after + // the regionserver reports it's closed. + return null; + } + case OFFLINE: + case CLOSED: + break; + default: + LOG.error("Trying to assign region " + region + + ", which is " + state); + return null; + } + return state; + } + + /** + * Caller must hold lock on the passed <code>state</code> object. + * @param state + * @param forceNewPlan + */ + private void assign(RegionState state, boolean forceNewPlan) { + long startTime = EnvironmentEdgeManager.currentTime(); + try { + Configuration conf = server.getConfiguration(); + RegionPlan plan = null; + long maxWaitTime = -1; + HRegionInfo region = state.getRegion(); + Throwable previousException = null; + for (int i = 1; i <= maximumAttempts; i++) { + if (server.isStopped() || server.isAborted()) { + LOG.info("Skip assigning " + region.getRegionNameAsString() + + ", the server is stopped/aborted"); + return; + } + + if (plan == null) { // Get a server for the region at first + try { + plan = getRegionPlan(region, forceNewPlan); + } catch (HBaseIOException e) { + LOG.warn("Failed to get region plan", e); + } + } + + if (plan == null) { + LOG.warn("Unable to determine a plan to assign " + region); + + // For meta region, we have to keep retrying until succeeding + if (region.isMetaRegion()) { + if (i == maximumAttempts) { + i = 0; // re-set attempt count to 0 for at least 1 retry + + LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region + + " after maximumAttempts (" + this.maximumAttempts + + "). Reset attempts count and continue retrying."); + } + waitForRetryingMetaAssignment(); + continue; + } + + regionStates.updateRegionState(region, State.FAILED_OPEN); + return; + } + LOG.info("Assigning " + region.getRegionNameAsString() + + " to " + plan.getDestination()); + // Transition RegionState to PENDING_OPEN + regionStates.updateRegionState(region, + State.PENDING_OPEN, plan.getDestination()); + + boolean needNewPlan = false; + final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() + + " to " + plan.getDestination(); + try { + List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST; + if (shouldAssignFavoredNodes(region)) { + favoredNodes = server.getFavoredNodesManager().getFavoredNodesWithDNPort(region); + } + serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes); + return; // we're done + } catch (Throwable t) { + if (t instanceof RemoteException) { + t = ((RemoteException) t).unwrapRemoteException(); + } + previousException = t; + + // Should we wait a little before retrying? If the server is starting it's yes. + boolean hold = (t instanceof ServerNotRunningYetException); + + // In case socket is timed out and the region server is still online, + // the openRegion RPC could have been accepted by the server and + // just the response didn't go through. So we will retry to + // open the region on the same server. + boolean retry = !hold && (t instanceof java.net.SocketTimeoutException + && this.serverManager.isServerOnline(plan.getDestination())); + + if (hold) { + LOG.warn(assignMsg + ", waiting a little before trying on the same region server " + + "try=" + i + " of " + this.maximumAttempts, t); + + if (maxWaitTime < 0) { + maxWaitTime = EnvironmentEdgeManager.currentTime() + + this.server.getConfiguration().getLong( + "hbase.regionserver.rpc.startup.waittime", 60000); + } + try { + long now = EnvironmentEdgeManager.currentTime(); + if (now < maxWaitTime) { + if (LOG.isDebugEnabled()) { + LOG.debug("Server is not yet up; waiting up to " + + (maxWaitTime - now) + "ms", t); + } + Thread.sleep(100); + i--; // reset the try count + } else { + LOG.debug("Server is not up for a while; try a new one", t); + needNewPlan = true; + } + } catch (InterruptedException ie) { + LOG.warn("Failed to assign " + + region.getRegionNameAsString() + " since interrupted", ie); + regionStates.updateRegionState(region, State.FAILED_OPEN); + Thread.currentThread().interrupt(); + return; + } + } else if (retry) { + i--; // we want to retry as many times as needed as long as the RS is not dead. + if (LOG.isDebugEnabled()) { + LOG.debug(assignMsg + ", trying to assign to the same region server due ", t); + } + } else { + needNewPlan = true; + LOG.warn(assignMsg + ", trying to assign elsewhere instead;" + + " try=" + i + " of " + this.maximumAttempts, t); + } + } + + if (i == this.maximumAttempts) { + // For meta region, we have to keep retrying until succeeding + if (region.isMetaRegion()) { + i = 0; // re-set attempt count to 0 for at least 1 retry + LOG.warn(assignMsg + + ", trying to assign a hbase:meta region reached to maximumAttempts (" + + this.maximumAttempts + "). Reset attempt counts and continue retrying."); + waitForRetryingMetaAssignment(); + } + else { + // Don't reset the region state or get a new plan any more. + // This is the last try. + continue; + } + } + + // If region opened on destination of present plan, reassigning to new + // RS may cause double assignments. In case of RegionAlreadyInTransitionException + // reassigning to same RS. + if (needNewPlan) { + // Force a new plan and reassign. Will return null if no servers. + // The new plan could be the same as the existing plan since we don't + // exclude the server of the original plan, which should not be + // excluded since it could be the only server up now. + RegionPlan newPlan = null; + try { + newPlan = getRegionPlan(region, true); + } catch (HBaseIOException e) { + LOG.warn("Failed to get region plan", e); + } + if (newPlan == null) { + regionStates.updateRegionState(region, State.FAILED_OPEN); + LOG.warn("Unable to find a viable location to assign region " + + region.getRegionNameAsString()); + return; + } + + if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) { + // Clean out plan we failed execute and one that doesn't look like it'll + // succeed anyways; we need a new plan! + // Transition back to OFFLINE + regionStates.updateRegionState(region, State.OFFLINE); + plan = newPlan; + } else if(plan.getDestination().equals(newPlan.getDestination()) && + previousException instanceof FailedServerException) { + try { + LOG.info("Trying to re-assign " + region.getRegionNameAsString() + + " to the same failed server."); + Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, + RpcClient.FAILED_SERVER_EXPIRY_DEFAULT)); + } catch (InterruptedException ie) { + LOG.warn("Failed to assign " + + region.getRegionNameAsString() + " since interrupted", ie); + regionStates.updateRegionState(region, State.FAILED_OPEN); + Thread.currentThread().interrupt(); + return; + } + } + } + } + // Run out of attempts + regionStates.updateRegionState(region, State.FAILED_OPEN); + } finally { + metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime); + } + } + + private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) { + if (this.tableStateManager.isTableState(region.getTable(), + TableState.State.DISABLED, + TableState.State.DISABLING) || replicasToClose.contains(region)) { + LOG.info("Table " + region.getTable() + " is disabled or disabling;" + + " skipping assign of " + region.getRegionNameAsString()); + offlineDisabledRegion(region); + return true; + } + return false; + } + + /** + * @param region the region to assign + * @param forceNewPlan If true, then if an existing plan exists, a new plan + * will be generated. + * @return Plan for passed <code>region</code> (If none currently, it creates one or + * if no servers to assign, it returns null). + */ + private RegionPlan getRegionPlan(final HRegionInfo region, + final boolean forceNewPlan) throws HBaseIOException { + // Pickup existing plan or make a new one + final String encodedName = region.getEncodedName(); + final List<ServerName> destServers = + serverManager.createDestinationServersList(); + + if (destServers.isEmpty()){ + LOG.warn("Can't move " + encodedName + + ", there is no destination server available."); + return null; + } + + RegionPlan randomPlan = null; + boolean newPlan = false; + RegionPlan existingPlan; + + synchronized (this.regionPlans) { + existingPlan = this.regionPlans.get(encodedName); + + if (existingPlan != null && existingPlan.getDestination() != null) { + LOG.debug("Found an existing plan for " + region.getRegionNameAsString() + + " destination server is " + existingPlan.getDestination() + + " accepted as a dest server = " + destServers.contains(existingPlan.getDestination())); + } + + if (forceNewPlan + || existingPlan == null + || existingPlan.getDestination() == null + || !destServers.contains(existingPlan.getDestination())) { + newPlan = true; + try { + randomPlan = new RegionPlan(region, null, + balancer.randomAssignment(region, destServers)); + } catch (IOException ex) { + LOG.warn("Failed to create new plan.",ex); + return null; + } + this.regionPlans.put(encodedName, randomPlan); + } + } + + if (newPlan) { + if (randomPlan.getDestination() == null) { + LOG.warn("Can't find a destination for " + encodedName); + return null; + } + if (LOG.isDebugEnabled()) { + LOG.debug("No previous transition plan found (or ignoring " + + "an existing plan) for " + region.getRegionNameAsString() + + "; generated random plan=" + randomPlan + "; " + destServers.size() + + " (online=" + serverManager.getOnlineServers().size() + + ") available servers, forceNewPlan=" + forceNewPlan); + } + return randomPlan; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Using pre-existing plan for " + + region.getRegionNameAsString() + "; plan=" + existingPlan); + } + return existingPlan; + } + + /** + * Wait for some time before retrying meta table region assignment + */ + private void waitForRetryingMetaAssignment() { + try { + Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment); + } catch (InterruptedException e) { + LOG.error("Got exception while waiting for hbase:meta assignment"); + Thread.currentThread().interrupt(); + } + } + + /** + * Unassigns the specified region. + * <p> + * Updates the RegionState and sends the CLOSE RPC unless region is being + * split by regionserver; then the unassign fails (silently) because we + * presume the region being unassigned no longer exists (its been split out + * of existence). TODO: What to do if split fails and is rolled back and + * parent is revivified? + * <p> + * If a RegionPlan is already set, it will remain. + * + * @param region server to be unassigned + */ + public void unassign(HRegionInfo region) { + unassign(region, null); + } + + + /** + * Unassigns the specified region. + * <p> + * Updates the RegionState and sends the CLOSE RPC unless region is being + * split by regionserver; then the unassign fails (silently) because we + * presume the region being unassigned no longer exists (its been split out + * of existence). TODO: What to do if split fails and is rolled back and + * parent is revivified? + * <p> + * If a RegionPlan is already set, it will remain. + * + * @param region server to be unassigned + * @param dest the destination server of the region + */ + public void unassign(HRegionInfo region, ServerName dest) { + // TODO: Method needs refactoring. Ugly buried returns throughout. Beware! + LOG.debug("Starting unassign of " + region.getRegionNameAsString() + + " (offlining), current state: " + regionStates.getRegionState(region)); + + String encodedName = region.getEncodedName(); + // Grab the state of this region and synchronize on it + // We need a lock here as we're going to do a put later and we don't want multiple states + // creation + ReentrantLock lock = locker.acquireLock(encodedName); + RegionState state = regionStates.getRegionTransitionState(encodedName); + try { + if (state == null || state.isFailedClose()) { + if (state == null) { + // Region is not in transition. + // We can unassign it only if it's not SPLIT/MERGED. + state = regionStates.getRegionState(encodedName); + if (state != null && state.isUnassignable()) { + LOG.info("Attempting to unassign " + state + ", ignored"); + // Offline region will be reassigned below + return; + } + if (state == null || state.getServerName() == null) { + // We don't know where the region is, offline it. + // No need to send CLOSE RPC + LOG.warn("Attempting to unassign a region not in RegionStates " + + region.getRegionNameAsString() + ", offlined"); + regionOffline(region); + return; + } + } + state = regionStates.updateRegionState( + region, State.PENDING_CLOSE); + } else if (state.isFailedOpen()) { + // The region is not open yet + regionOffline(region); + return; + } else { + LOG.debug("Attempting to unassign " + + region.getRegionNameAsString() + " but it is " + + "already in transition (" + state.getState()); + return; + } + + unassign(region, state.getServerName(), dest); + } finally { + lock.unlock(); + + // Region is expected to be reassigned afterwards + if (!replicasToClose.contains(region) + && regionStates.isRegionInState(region, State.OFFLINE)) { + assign(region); + } + } + } + + /** + * Used by unit tests. Return the number of regions opened so far in the life + * of the master. Increases by one every time the master opens a region + * @return the counter value of the number of regions opened so far + */ + public int getNumRegionsOpened() { + return numRegionsOpened.get(); + } + + /** + * Waits until the specified region has completed assignment. + * <p> + * If the region is already assigned, returns immediately. Otherwise, method + * blocks until the region is assigned. + * @param regionInfo region to wait on assignment for + * @return true if the region is assigned false otherwise. + * @throws InterruptedException + */ + public boolean waitForAssignment(HRegionInfo regionInfo) + throws InterruptedException { + ArrayList<HRegionInfo> regionSet = new ArrayList<>(1); + regionSet.add(regionInfo); + return waitForAssignment(regionSet, true, Long.MAX_VALUE); + } + + /** + * Waits until the specified region has completed assignment, or the deadline is reached. + */ + protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet, + final boolean waitTillAllAssigned, final int reassigningRegions, + final long minEndTime) throws InterruptedException { + long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1); + if (deadline < 0) { // Overflow + deadline = Long.MAX_VALUE; // wait forever + } + return waitForAssignment(regionSet, waitTillAllAssigned, deadline); + } + + /** + * Waits until the specified region has completed assignment, or the deadline is reached. + * @param regionSet set of region to wait on. the set is modified and the assigned regions removed + * @param waitTillAllAssigned true if we should wait all the regions to be assigned + * @param deadline the timestamp after which the wait is aborted + * @return true if all the regions are assigned false otherwise. + * @throws InterruptedException + */ + protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet, + final boolean waitTillAllAssigned, final long deadline) throws InterruptedException { + // We're not synchronizing on regionsInTransition now because we don't use any iterator. + while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) { + int failedOpenCount = 0; + Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator(); + while (regionInfoIterator.hasNext()) { + HRegionInfo hri = regionInfoIterator.next(); + if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri, + State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) { + regionInfoIterator.remove(); + } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) { + failedOpenCount++; + } + } + if (!waitTillAllAssigned) { + // No need to wait, let assignment going on asynchronously + break; + } + if (!regionSet.isEmpty()) { + if (failedOpenCount == regionSet.size()) { + // all the regions we are waiting had an error on open. + break; + } + regionStates.waitForUpdate(100); + } + } + return regionSet.isEmpty(); + } + + /** + * Assigns the hbase:meta region or a replica. + * <p> + * Assumes that hbase:meta is currently closed and is not being actively served by + * any RegionServer. + * @param hri TODO + */ + public void assignMeta(HRegionInfo hri) throws KeeperException { + regionStates.updateRegionState(hri, State.OFFLINE); + assign(hri); + } + + /** + * Assigns specified regions retaining assignments, if any. + * <p> + * This is a synchronous call and will return once every region has been + * assigned. If anything fails, an exception is thrown + * @throws InterruptedException + * @throws IOException + */ + public void assign(Map<HRegionInfo, ServerName> regions) + throws IOException, InterruptedException { + if (regions == null || regions.isEmpty()) { + return; + } + List<ServerName> servers = serverManager.createDestinationServersList(); + if (servers == null || servers.isEmpty()) { + throw new IOException("Found no destination server to assign region(s)"); + } + + // Reuse existing assignment info + Map<ServerName, List<HRegionInfo>> bulkPlan = + balancer.retainAssignment(regions, servers); + if (bulkPlan == null) { + throw new IOException("Unable to determine a plan to assign region(s)"); + } + + processBogusAssignments(bulkPlan); + + assign(regions.size(), servers.size(), + "retainAssignment=true", bulkPlan); + } + + /** + * Assigns specified regions round robin, if any. + * <p> + * This is a synchronous call and will return once every region has been + * assigned. If anything fails, an exception is thrown + * @throws InterruptedException + * @throws IOException + */ + public void assign(List<HRegionInfo> regions) + throws IOException, InterruptedException { + if (regions == null || regions.isEmpty()) { + return; + } + + List<ServerName> servers = serverManager.createDestinationServersList(); + if (servers == null || servers.isEmpty()) { + throw new IOException("Found no destination server to assign region(s)"); + } + + // Generate a round-robin bulk assignment plan + Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers); + if (bulkPlan == null) { + throw new IOException("Unable to determine a plan to assign region(s)"); + } + + processBogusAssignments(bulkPlan); + + assign(regions.size(), servers.size(), "round-robin=true", bulkPlan); + } + + private void assign(int regions, int totalServers, + String message, Map<ServerName, List<HRegionInfo>> bulkPlan) + throws InterruptedException, IOException { + + int servers = bulkPlan.size(); + if (servers == 1 || (regions < bulkAssignThresholdRegions + && servers < bulkAssignThresholdServers)) { + + // Not use bulk assignment. This could be more efficient in small + // cluster, especially mini cluster for testing, so that tests won't time out + if (LOG.isTraceEnabled()) { + LOG.trace("Not using bulk assignment since we are assigning only " + regions + + " region(s) to " + servers + " server(s)"); + } + + // invoke assignment (async) + ArrayList<HRegionInfo> userRegionSet = new ArrayList<>(regions); + for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) { + if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) { + for (HRegionInfo region: plan.getValue()) { + if (!regionStates.isRegionOnline(region)) { + invokeAssign(region); + if (!region.getTable().isSystemTable()) { + userRegionSet.add(region); + } + } + } + } + } + + // wait for assignment completion + if (!waitForAssignment(userRegionSet, true, userRegionSet.size(), + System.currentTimeMillis())) { + LOG.debug("some user regions are still in transition: " + userRegionSet); + } + } else { + LOG.info("Bulk assigning " + regions + " region(s) across " + + totalServers + " server(s), " + message); + + // Use fixed count thread pool assigning. + BulkAssigner ba = new GeneralBulkAssigner( + this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned); + ba.bulkAssign(); + LOG.info("Bulk assigning done"); + } + } + + /** + * Assigns all user regions, if any exist. Used during cluster startup. + * <p> + * This is a synchronous call and will return once every region has been + * assigned. If anything fails, an exception is thrown and the cluster + * should be shutdown. + * @throws InterruptedException + * @throws IOException + */ + private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions) + throws IOException, InterruptedException { + if (allRegions == null || allRegions.isEmpty()) return; + + // Determine what type of assignment to do on startup + boolean retainAssignment = server.getConfiguration(). + getBoolean("hbase.master.startup.retainassign", true); + + Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet(); + if (retainAssignment) { + assign(allRegions); + } else { + List<HRegionInfo> regions = new ArrayList<>(regionsFromMetaScan); + assign(regions); + } + + for (HRegionInfo hri : regionsFromMetaScan) { + TableName tableName = hri.getTable(); + if (!tableStateManager.isTableState(tableName, + TableState.State.ENABLED)) { + setEnabledTable(tableName); + } + } + // assign all the replicas that were not recorded in the meta + assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server)); + } + + /** + * Get number of replicas of a table + */ + private static int getNumReplicas(MasterServices master, TableName table) { + int numReplica = 1; + try { + HTableDescriptor htd = master.getTableDescriptors().get(table); + if (htd == null) { + LOG.warn("master can not get TableDescriptor from table '" + table); + } else { + numReplica = htd.getRegionReplication(); + } + } catch (IOException e){ + LOG.warn("Couldn't get the replication attribute of the table " + table + " due to " + + e.getMessage()); + } + return numReplica; + } + + /** + * Get a list of replica regions that are: + * not recorded in meta yet. We might not have recorded the locations + * for the replicas since the replicas may not have been online yet, master restarted + * in the middle of assigning, ZK erased, etc. + * @param regionsRecordedInMeta the list of regions we know are recorded in meta + * either as a default, or, as the location of a replica + * @param master + * @return list of replica regions + * @throws IOException + */ + public static List<HRegionInfo> replicaRegionsNotRecordedInMeta( + Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException { + List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<>(); + for (HRegionInfo hri : regionsRecordedInMeta) { + TableName table = hri.getTable(); + if(master.getTableDescriptors().get(table) == null) + continue; + int desiredRegionReplication = getNumReplicas(master, table); + for (int i = 0; i < desiredRegionReplication; i++) { + HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i); + if (regionsRecordedInMeta.contains(replica)) continue; + regionsNotRecordedInMeta.add(replica); + } + } + return regionsNotRecordedInMeta; + } + + /** + * Rebuild the list of user regions and assignment information. + * Updates regionstates with findings as we go through list of regions. + * @return set of servers not online that hosted some regions according to a scan of hbase:meta + * @throws IOException + */ + Set<ServerName> rebuildUserRegions() throws + IOException, KeeperException { + Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates( + TableState.State.DISABLED, TableState.State.ENABLING); + + Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates( + TableState.State.DISABLED, + TableState.State.DISABLING, + TableState.State.ENABLING); + + // Region assignment from META + List<Result> results = MetaTableAccessor.fullScanRegions(server.getConnection()); + // Get any new but slow to checkin region server that joined the cluster + Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet(); + // Set of offline servers to be returned + Set<ServerName> offlineServers = new HashSet<>(); + // Iterate regions in META + for (Result result : results) { + if (result == null && LOG.isDebugEnabled()){ + LOG.debug("null result from meta - ignoring but this is strange."); + continue; + } + // keep a track of replicas to close. These were the replicas of the originally + // unmerged regions. The master might have closed them before but it mightn't + // maybe because it crashed. + PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result); + if (p.getFirst() != null && p.getSecond() != null) { + int numReplicas = getNumReplicas(server, p.getFirst().getTable()); + for (HRegionInfo merge : p) { + for (int i = 1; i < numReplicas; i++) { + replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i)); + } + } + } + RegionLocations rl = MetaTableAccessor.getRegionLocations(result); + if (rl == null) { + continue; + } + HRegionLocation[] locations = rl.getRegionLocations(); + if (locations == null) { + continue; + } + for (HRegionLocation hrl : locations) { + if (hrl == null) continue; + HRegionInfo regionInfo = hrl.getRegionInfo(); + if (regionInfo == null) continue; + int replicaId = regionInfo.getReplicaId(); + State state = RegionStateStore.getRegionState(result, replicaId); + // keep a track of replicas to close. These were the replicas of the split parents + // from the previous life of the master. The master should have closed them before + // but it couldn't maybe because it crashed + if (replicaId == 0 && state.equals(State.SPLIT)) { + for (HRegionLocation h : locations) { + replicasToClose.add(h.getRegionInfo()); + } + } + ServerName lastHost = hrl.getServerName(); + ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId); + regionStates.createRegionState(regionInfo, state, regionLocation, lastHost); + if (!regionStates.isRegionInState(regionInfo, State.OPEN)) { + // Region is not open (either offline or in transition), skip + continue; + } + TableName tableName = regionInfo.getTable(); + if (!onlineServers.contains(regionLocation)) { + // Region is located on a server that isn't online + offlineServers.add(regionLocation); + } else if (!disabledOrEnablingTables.contains(tableName)) { + // Region is being served and on an active server + // add only if region not in disabled or enabling table + regionStates.regionOnline(regionInfo, regionLocation); + balancer.regionOnline(regionInfo, regionLocation); + } + // need to enable the table if not disabled or disabling or enabling + // this will be used in rolling restarts + if (!disabledOrDisablingOrEnabling.contains(tableName) + && !getTableStateManager().isTableState(tableName, + TableState.State.ENABLED)) { + setEnabledTable(tableName); + } + } + } + return offlineServers; + } + + /** + * Processes list of regions in transition at startup + */ + void processRegionsInTransition(Collection<RegionState> regionsInTransition) { + // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions + // in case the RPC call is not sent out yet before the master was shut down + // since we update the state before we send the RPC call. We can't update + // the state after the RPC call. Otherwise, we don't know what's happened + // to the region if the master dies right after the RPC call is out. + for (RegionState regionState: regionsInTransition) { + LOG.info("Processing " + regionState); + ServerName serverName = regionState.getServerName(); + // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that + // case, try assigning it here. + if (serverName != null && !serverManager.getOnlineServers().containsKey(serverName)) { + LOG.info("Server " + serverName + " isn't online. SSH will handle this"); + continue; // SSH will handle it + } + HRegionInfo regionInfo = regionState.getRegion(); + RegionState.State state = regionState.getState(); + switch (state) { + case CLOSED: + invokeAssign(regionState.getRegion()); + break; + case PENDING_OPEN: + retrySendRegionOpen(regionState); + break; + case PENDING_CLOSE: + retrySendRegionClose(regionState); + break; + case FAILED_CLOSE: + case FAILED_OPEN: + invokeUnAssign(regionInfo); + break; + default: + // No process for other states + break; + } + } + } + + /** + * At master failover, for pending_open region, make sure + * sendRegionOpen RPC call is sent to the target regionserver + */ + private void retrySendRegionOpen(final RegionState regionState) { + this.executorService.submit( + new EventHandler(server, EventType.M_MASTER_RECOVERY) { + @Override + public void process() throws IOException { + HRegionInfo hri = regionState.getRegion(); + ServerName serverName = regionState.getServerName(); + ReentrantLock lock = locker.acquireLock(hri.getEncodedName()); + try { + for (int i = 1; i <= maximumAttempts; i++) { + if (!serverManager.isServerOnline(serverName) + || server.isStopped() || server.isAborted()) { + return; // No need any more + } + try { + if (!regionState.equals(regionStates.getRegionState(hri))) { + return; // Region is not in the expected state any more + } + List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST; + if (shouldAssignFavoredNodes(hri)) { + FavoredNodesManager fnm = ((MasterServices)server).getFavoredNodesManager(); + favoredNodes = fnm.getFavoredNodesWithDNPort(hri); + } + serverManager.sendRegionOpen(serverName, hri, favoredNodes); + return; // we're done + } catch (Throwable t) { + if (t instanceof RemoteException) { + t = ((RemoteException) t).unwrapRemoteException(); + } + if (t instanceof FailedServerException && i < maximumAttempts) { + // In case the server is in the failed server list, no point to + // retry too soon. Retry after the failed_server_expiry time + try { + Configuration conf = this.server.getConfiguration(); + long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, + RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); + if (LOG.isDebugEnabled()) { + LOG.debug(serverName + " is on failed server list; waiting " + + sleepTime + "ms", t); + } + Thread.sleep(sleepTime); + continue; + } catch (InterruptedException ie) { + LOG.warn("Failed to assign " + + hri.getRegionNameAsString() + " since interrupted", ie); + regionStates.updateRegionState(hri, State.FAILED_OPEN); + Thread.currentThread().interrupt(); + return; + } + } + if (serverManager.isServerOnline(serverName) + && t instanceof java.net.SocketTimeoutException) { + i--; // reset the try count + } else { + LOG.info("Got exception in retrying sendRegionOpen for " + + regionState + "; try=" + i + " of " + maximumAttempts, t); + } + Threads.sleep(100); + } + } + // Run out of attempts + regionStates.updateRegionState(hri, State.FAILED_OPEN); + } finally { + lock.unlock(); + } + } + }); + } + + /** + * At master failover, for pending_close region, make sure + * sendRegionClose RPC call is sent to the target regionserver + */ + private void retrySendRegionClose(final RegionState regionState) { + this.executorService.submit( + new EventHandler(server, EventType.M_MASTER_RECOVERY) { + @Override + public void process() throws IOException { + HRegionInfo hri = regionState.getRegion(); + ServerName serverName = regionState.getServerName(); + ReentrantLock lock = locker.acquireLock(hri.getEncodedName()); + try { + for (int i = 1; i <= maximumAttempts; i++) { + if (!serverManager.isServerOnline(serverName) + || server.isStopped() || server.isAborted()) { + return; // No need any more + } + try { + if (!regionState.equals(regionStates.getRegionState(hri))) { + return; // Region is not in the expected state any more + } + serverManager.sendRegionClose(serverName, hri, null); + return; // Done. + } catch (Throwable t) { + if (t instanceof RemoteException) { + t = ((RemoteException) t).unwrapRemoteException(); + } + if (t instanceof FailedServerException && i < maximumAttempts) { + // In case the server is in the failed server list, no point to + // retry too soon. Retry after the failed_server_expiry time + try { + Configuration conf = this.server.getConfiguration(); + long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, + RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); + if (LOG.isDebugEnabled()) { + LOG.debug(serverName + " is on failed server list; waiting " + + sleepTime + "ms", t); + } + Thread.sleep(sleepTime); + continue; + } catch (InterruptedException ie) { + LOG.warn("Failed to unassign " + + hri.getRegionNameAsString() + " since interrupted", ie); + regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE); + Thread.currentThread().interrupt(); + return; + } + } + if (serverManager.isServerOnline(serverName) + && t instanceof java.net.SocketTimeoutException) { + i--; // reset the try count + } else { + LOG.info("Got exception in retrying sendRegionClose for " + + regionState + "; try=" + i + " of " + maximumAttempts, t); + } + Threads.sleep(100); + } + } + // Run out of attempts + regionStates.updateRegionState(hri, State.FAILED_CLOSE); + } finally { + lock.unlock(); + } + } + }); + } + + /** + * Set Regions in transitions metrics. + * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized. + * This iterator is not fail fast, which may lead to stale read; but that's better than + * creating a copy of the map for metrics computation, as this method will be invoked + * on a frequent interval. + */ + public void updateRegionsInTransitionMetrics() { + long currentTime = System.currentTimeMillis(); + int totalRITs = 0; + int totalRITsOverThreshold = 0; + long oldestRITTime = 0; + int ritThreshold = this.server.getConfiguration(). + getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000); + for (RegionState state: regionStates.getRegionsInTransition()) { + totalRITs++; + long ritTime = currentTime - state.getStamp(); + if (ritTime > ritThreshold) { // more than the threshold + totalRITsOverThreshold++; + } + if (oldestRITTime < ritTime) { + oldestRITTime = ritTime; + } + } + if (this.metricsAssignmentManager != null) { + this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime); + this.metricsAssignmentManager.updateRITCount(totalRITs); + this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold); + } + } + + /** + * @param region Region whose plan we are to clear. + */ + private void clearRegionPlan(final HRegionInfo region) { + synchronized (this.regionPlans) { + this.regionPlans.remove(region.getEncodedName()); + } + } + + /** + * Wait on region to clear regions-in-transition. + * @param hri Region to wait on. + * @throws IOException + */ + public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri) + throws IOException, InterruptedException { + waitOnRegionToClearRegionsInTransition(hri, -1L); + } + + /** + * Wait on region to clear regions-in-transition or time out + * @param hri + * @param timeOut Milliseconds to wait for current region to be out of transition state. + * @return True when a region clears regions-in-transition before timeout otherwise false + * @throws InterruptedException + */ + public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut) + throws InterruptedException { + if (!regionStates.isRegionInTransition(hri)) { + return true; + } + long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime() + + timeOut; + // There is already a timeout monitor on regions in transition so I + // should not have to have one here too? + LOG.info("Waiting for " + hri.getEncodedName() + + " to leave regions-in-transition, timeOut=" + timeOut + " ms."); + while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) { + regionStates.waitForUpdate(100); + if (EnvironmentEdgeManager.currentTime() > end) { + LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned."); + return false; + } + } + if (this.server.isStopped()) { + LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set"); + return false; + } + return true; + } + + void invokeAssign(HRegionInfo regionInfo) { + threadPoolExecutorService.submit(new AssignCallable(this, regionInfo)); + } + + void invokeAssignLater(HRegionInfo regionInfo, long sleepMillis) { + scheduledThreadPoolExecutor.schedule(new DelayedAssignCallable( + new AssignCallable(this, regionInfo)), sleepMillis, TimeUnit.MILLISECONDS); + } + + void invokeUnAssign(HRegionInfo regionInfo) { + threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo)); + } + + public boolean isCarryingMeta(ServerName serverName) { + return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO); + } + + public boolean isCarryingMetaReplica(ServerName serverName, int replicaId) { + return isCarryingRegion(serverName, + RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId)); + } + + public boolean isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) { + return isCarryingRegion(serverName, metaHri); + } + + /** + * Check if the shutdown server carries the specific region. + * @return whether the serverName currently hosts the region + */ + private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) { + RegionState regionState = regionStates.getRegionTransitionState(hri); + ServerName transitionAddr = regionState != null? regionState.getServerName(): null; + if (transitionAddr != null) { + boolean matchTransitionAddr = transitionAddr.equals(serverName); + LOG.debug("Checking region=" + hri.getRegionNameAsString() + + ", transitioning on server=" + matchTransitionAddr + + " server being checked: " + serverName + + ", matches=" + matchTransitionAddr); + return matchTransitionAddr; + } + + ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri); + boolean matchAssignedAddr = serverName.equals(assignedAddr); + LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() + + " is on server=" + assignedAddr + ", server being checked: " + + serverName); + return matchAssignedAddr; + } + + /** + * Clean out crashed server removing any assignments. + * @param sn Server that went down. + * @return list of regions in transition on this server + */ + public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) { + // Clean out any existing assignment plans for this server + synchronized (this.regionPlans) { + for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator(); + i.hasNext();) { + Map.Entry<String, RegionPlan> e = i.next(); + ServerName otherSn = e.getValue().getDestination(); + // The name will be null if the region is planned for a random assign. + if (otherSn != null && otherSn.equals(sn)) { + // Use iterator's remove else we'll get CME + i.remove(); + } + } + } + List<HRegionInfo> rits = regionStates.serverOffline(sn); + for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) { + HRegionInfo hri = it.next(); + String encodedName = hri.getEncodedName(); + + // We need a lock on the region as we could update it + Lock lock = locker.acquireLock(encodedName); + try { + RegionState regionState = regionStates.getRegionTransitionState(encodedName); + if (regionState == null + || (regionState.getServerName() != null && !regionState.isOnServer(sn)) + || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN, + State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) { + LOG.info("Skip " + regionState + " since it is not opening/failed_close" + + " on the dead server any more: " + sn); + it.remove(); + } else { + if (tab
<TRUNCATED>