http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java new file mode 100644 index 0000000..f1c1a40 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -0,0 +1,1792 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.assignment; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.PleaseHoldException; +import org.apache.hadoop.hbase.RegionException; +import org.apache.hadoop.hbase.RegionStateListener; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; +import org.apache.hadoop.hbase.favored.FavoredNodeLoadBalancer; +import org.apache.hadoop.hbase.favored.FavoredNodesManager; +import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; +import org.apache.hadoop.hbase.master.AssignmentListener; +import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.MetricsAssignmentManager; +import org.apache.hadoop.hbase.master.NoSuchProcedureException; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.ServerListener; +import org.apache.hadoop.hbase.master.TableStateManager; +import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; +import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerState; +import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode; +// TODO: why are they here? +import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; +import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.quotas.QuotaExceededException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; + +import com.google.common.annotations.VisibleForTesting; + +/** + * The AssignmentManager is the coordinator for region assign/unassign operations. + * <ul> + * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li> + * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li> + * </ul> + * Regions are created by CreateTable, Split, Merge. + * Regions are deleted by DeleteTable, Split, Merge. + * Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. + * Unassigns are triggered by DisableTable, Split, Merge + */ +@InterfaceAudience.Private +public class AssignmentManager implements ServerListener { + private static final Log LOG = LogFactory.getLog(AssignmentManager.class); + + // TODO: AMv2 + // - handle region migration from hbase1 to hbase2. + // - handle sys table assignment first (e.g. acl, namespace) + // - handle table priorities <= IS THIS DONE? + // - If ServerBusyException trying to update hbase:meta, we abort the Master + // See updateRegionLocation in RegionStateStore. + // + // Split and Merge are done differently. Split has flags on HRI. Merge does not. + // We delete regions from hbase:meta when we finish merge procedure. We don't do + // same when we split. REVIEW!! Yeah, this is incomplete. Needs finishing. + // Also, split is done by asking the RS which asks the Master to do the job. + // Undo. Remove all the RS-side classes that have support for merge/split; + // not needed anymore. + // + // We seem to update RegionStates -- i.e. in-memory view first and then + // update hbase:meta. What about crashes? We OK? TODO. + // + // Review rollbacks for all procedures. We can rollback a subprocedure even + // if it succeeds. Does this make sense in all cases? (Is there a case where + // we'd roll back a successful assign?) + // + // I disabled testMergeWithReplicas in TestRegionMergeTransactionOnCluster + // because don't know how it is supposed to work. TODO. + // + // TODO: The odd time we want to set a ServerName to 'unset' or null. This + // is not allowed. Setting null into zk or into protobuf fails. Make + // a ServerNode.EMPTY and check for it everywhere? What about clients? Do we + // want to set null ServerName ever? What is an old client todo when it sees + // an empty ServerName? + // + // TODO: Admin#close with ServerName does not update hbase:meta so Master thinks + // region still assigned. TODO: Tell Master. This is a problem in + // testHBaseFsckWithFewerMetaReplicaZnodes in TestMetaWithReplicas and a few other + // tests. + // + // TODO: Unassign is implemented but its supposed to be renamed reassign? + // + // TODO: A region in FAILED ASSIGN STATE, how to alert on this? Metric? + // + // TODO: ProcedureSyncWait REMOVE + // * Helper to synchronously wait on conditions. + // * This will be removed in the future (mainly when the AssignmentManager will be + // * replaced with a Procedure version) by using ProcedureYieldException, + // * and the queue will handle waiting and scheduling based on events. + // + // TestCloneSnapshotFromClient and its Mob subclasses seems flakey. + // + // TODO: Disabled/Ignore TestRSGroupsOfflineMode#testOffline; need to dig in on what offline is. + // TODO: Disabled/Ignore TestRSGroups. + // + // TODO: Disabled fsck tests: TestHBaseFsckTwoRS, TestOfflineMetaRebuildBase + // TestHBaseFsckReplicas, TestOfflineMetaRebuildOverlap, testChangingReplicaCount in + // TestMetaWithReplicas (internally it is doing fscks which are killing RS), + // + // TODO: TestRegionRebalancing is disabled because doesn't consider the fact + // that Master carries system tables only (fix of average in RegionStates + // brought out the issue). + // + // Disabled parts of...testCreateTableWithMultipleReplicas in TestMasterOperationsForRegionReplicas + // There is an issue w/ assigning more replicas if number of replicas is changed on us. + // See '/* DISABLED!!!!! FOR NOW!!!!'. + // + // Disabled TestCorruptedRegionStoreFile + // Depends on a half-implemented reopen of a region when a store file goes missing; TODO. + // + // testRetainAssignmentOnRestart in TestRestartCluster does not work. AMv2 does retain + // semantic differently. Fix. TODO. + // + // TODO: TestMasterFailover needs to be rewritten for AMv2. It uses tricks not ordained + // when up on AMv2. The test is also hobbled by fact that we religiously enforce that + // only master can carry meta, something we are lose about in old AM. + // + // TODO: TestMergeTableRegionsProcedure Fix. Disabled. + // + // TODO: Fix Ignores in TestServerCrashProcedure. Master is different now. + // + // Offlining is not what it was. It makes region offline in master memory state. + // That is what it used to do... but this time the AMv2 will act on its state and + // if a regionserver reports in that the region is open on it, the master will tell + // it shutdown. This is what we want. + // Because of this disabled testOfflineRegion in TestAsyncRegionAdminApi + // + // FSCK test testHBaseFsckWithExcessMetaReplicas in TestMetaWithReplicas. + // So is testHBaseFsckWithFewerMetaReplicas in same class. + // + // Disabled testMetaAddressChange in TestMetaWithReplicas because presumes can + // move meta... you can't + // + // TODO: Skipping delete of table after test in TestAccessController3 + // because of access issues w/ AMv2. AMv1 seems to crash servers on exit too + // for same lack of auth perms but AMv2 gets hung up. TODO. See cleanUp method. + // FIX!!!! Good candidate for racing procs. + // + // TestHCM#testMulti and TestHCM + // + // TestHBaseFsckOneRS is fsck. Disabled. + // TestOfflineMetaRebuildHole is about rebuilding hole with fsck. + // + // TestAsyncTableGetMultiThreaded wants to move hbase:meta...Balancer does NPEs. + // AMv2 won't let you move hbase:meta off Master. + // + // Interesting issue around region replicas; split is trying to assign out replicas for + // new daughters but it takes a while to complete. In FN test, we move to disable the + // table and the disable messes up the open of the replicas; they hang. Added a little + // timeout for now. Need to come back to it. See TestTableFavoredNodes#testSplitTable + // + // Fix TestMasterMetrics. Stuff is different now around startup which messes up + // this test. Disabled two of three tests. + // + // I tried to fix TestMasterBalanceThrottling but it looks like SimpleLoadBalancer + // is borked whether AMv2 or not. + + + public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY = + "hbase.assignment.bootstrap.thread.pool.size"; + + public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY = + "hbase.assignment.dispatch.wait.msec"; + private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150; + + public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY = + "hbase.assignment.dispatch.wait.queue.max.size"; + private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100; + + public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY = + "hbase.assignment.rit.chore.interval.msec"; + private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 5 * 1000; + + public static final String ASSIGN_MAX_ATTEMPTS = + "hbase.assignment.maximum.attempts"; + private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = 10; + + /** Region in Transition metrics threshold time */ + public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = + "hbase.metrics.rit.stuck.warning.threshold"; + private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000; + + private final ProcedureEvent<?> metaInitializedEvent = new ProcedureEvent<>("meta initialized"); + private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load"); + + /** + * Indicator that AssignmentManager has recovered the region states so + * that ServerCrashProcedure can be fully enabled and re-assign regions + * of dead servers. So that when re-assignment happens, AssignmentManager + * has proper region states. + */ + private final ProcedureEvent<?> failoverCleanupDone = new ProcedureEvent<>("failover cleanup"); + + /** Listeners that are called on assignment events. */ + private final CopyOnWriteArrayList<AssignmentListener> listeners = + new CopyOnWriteArrayList<AssignmentListener>(); + + // TODO: why is this different from the listeners (carried over from the old AM) + private RegionStateListener regionStateListener; + + private final MetricsAssignmentManager metrics; + private final RegionInTransitionChore ritChore; + private final MasterServices master; + + private final AtomicBoolean running = new AtomicBoolean(false); + private final RegionStates regionStates = new RegionStates(); + private final RegionStateStore regionStateStore; + + private final boolean shouldAssignRegionsWithFavoredNodes; + private final int assignDispatchWaitQueueMaxSize; + private final int assignDispatchWaitMillis; + private final int assignMaxAttempts; + + private Thread assignThread; + + public AssignmentManager(final MasterServices master) { + this(master, new RegionStateStore(master)); + } + + public AssignmentManager(final MasterServices master, final RegionStateStore stateStore) { + this.master = master; + this.regionStateStore = stateStore; + this.metrics = new MetricsAssignmentManager(); + + final Configuration conf = master.getConfiguration(); + + // Only read favored nodes if using the favored nodes load balancer. + this.shouldAssignRegionsWithFavoredNodes = FavoredNodeLoadBalancer.class.isAssignableFrom( + conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class)); + + this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, + DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC); + this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, + DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX); + + this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, + DEFAULT_ASSIGN_MAX_ATTEMPTS)); + + int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, + DEFAULT_RIT_CHORE_INTERVAL_MSEC); + this.ritChore = new RegionInTransitionChore(ritChoreInterval); + } + + public void start() throws IOException { + if (!running.compareAndSet(false, true)) { + return; + } + + LOG.info("Starting assignment manager"); + + // Register Server Listener + master.getServerManager().registerListener(this); + + // Start the RegionStateStore + regionStateStore.start(); + + // Start the Assignment Thread + startAssignmentThread(); + } + + public void stop() { + if (!running.compareAndSet(true, false)) { + return; + } + + LOG.info("Stopping assignment manager"); + + // The AM is started before the procedure executor, + // but the actual work will be loaded/submitted only once we have the executor + final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null; + + // Remove the RIT chore + if (hasProcExecutor) { + master.getMasterProcedureExecutor().removeChore(this.ritChore); + } + + // Stop the Assignment Thread + stopAssignmentThread(); + + // Stop the RegionStateStore + regionStates.clear(); + regionStateStore.stop(); + + // Unregister Server Listener + master.getServerManager().unregisterListener(this); + + // Update meta events (for testing) + if (hasProcExecutor) { + getProcedureScheduler().suspendEvent(metaLoadEvent); + setFailoverCleanupDone(false); + for (HRegionInfo hri: getMetaRegionSet()) { + setMetaInitialized(hri, false); + } + } + } + + public boolean isRunning() { + return running.get(); + } + + public Configuration getConfiguration() { + return master.getConfiguration(); + } + + public MetricsAssignmentManager getAssignmentManagerMetrics() { + return metrics; + } + + private LoadBalancer getBalancer() { + return master.getLoadBalancer(); + } + + private MasterProcedureEnv getProcedureEnvironment() { + return master.getMasterProcedureExecutor().getEnvironment(); + } + + private MasterProcedureScheduler getProcedureScheduler() { + return getProcedureEnvironment().getProcedureScheduler(); + } + + protected int getAssignMaxAttempts() { + return assignMaxAttempts; + } + + /** + * Add the listener to the notification list. + * @param listener The AssignmentListener to register + */ + public void registerListener(final AssignmentListener listener) { + this.listeners.add(listener); + } + + /** + * Remove the listener from the notification list. + * @param listener The AssignmentListener to unregister + */ + public boolean unregisterListener(final AssignmentListener listener) { + return this.listeners.remove(listener); + } + + public void setRegionStateListener(final RegionStateListener listener) { + this.regionStateListener = listener; + } + + public RegionStates getRegionStates() { + return regionStates; + } + + public RegionStateStore getRegionStateStore() { + return regionStateStore; + } + + public List<ServerName> getFavoredNodes(final HRegionInfo regionInfo) { + return this.shouldAssignRegionsWithFavoredNodes? + ((FavoredNodeLoadBalancer)getBalancer()).getFavoredNodes(regionInfo): + ServerName.EMPTY_SERVER_LIST; + } + + // ============================================================================================ + // Table State Manager helpers + // ============================================================================================ + TableStateManager getTableStateManager() { + return master.getTableStateManager(); + } + + public boolean isTableEnabled(final TableName tableName) { + return getTableStateManager().isTableState(tableName, TableState.State.ENABLED); + } + + public boolean isTableDisabled(final TableName tableName) { + return getTableStateManager().isTableState(tableName, + TableState.State.DISABLED, TableState.State.DISABLING); + } + + // ============================================================================================ + // META Helpers + // ============================================================================================ + private boolean isMetaRegion(final HRegionInfo regionInfo) { + return regionInfo.isMetaRegion(); + } + + public boolean isMetaRegion(final byte[] regionName) { + return getMetaRegionFromName(regionName) != null; + } + + public HRegionInfo getMetaRegionFromName(final byte[] regionName) { + for (HRegionInfo hri: getMetaRegionSet()) { + if (Bytes.equals(hri.getRegionName(), regionName)) { + return hri; + } + } + return null; + } + + public boolean isCarryingMeta(final ServerName serverName) { + for (HRegionInfo hri: getMetaRegionSet()) { + if (isCarryingRegion(serverName, hri)) { + return true; + } + } + return false; + } + + private boolean isCarryingRegion(final ServerName serverName, final HRegionInfo regionInfo) { + // TODO: check for state? + final RegionStateNode node = regionStates.getRegionNode(regionInfo); + return(node != null && serverName.equals(node.getRegionLocation())); + } + + private HRegionInfo getMetaForRegion(final HRegionInfo regionInfo) { + //if (regionInfo.isMetaRegion()) return regionInfo; + // TODO: handle multiple meta. if the region provided is not meta lookup + // which meta the region belongs to. + return HRegionInfo.FIRST_META_REGIONINFO; + } + + // TODO: handle multiple meta. + private static final Set<HRegionInfo> META_REGION_SET = + Collections.singleton(HRegionInfo.FIRST_META_REGIONINFO); + public Set<HRegionInfo> getMetaRegionSet() { + return META_REGION_SET; + } + + // ============================================================================================ + // META Event(s) helpers + // ============================================================================================ + public boolean isMetaInitialized() { + return metaInitializedEvent.isReady(); + } + + public boolean isMetaRegionInTransition() { + return !isMetaInitialized(); + } + + public boolean waitMetaInitialized(final Procedure proc) { + // TODO: handle multiple meta. should this wait on all meta? + // this is used by the ServerCrashProcedure... + return waitMetaInitialized(proc, HRegionInfo.FIRST_META_REGIONINFO); + } + + public boolean waitMetaInitialized(final Procedure proc, final HRegionInfo regionInfo) { + return getProcedureScheduler().waitEvent( + getMetaInitializedEvent(getMetaForRegion(regionInfo)), proc); + } + + private void setMetaInitialized(final HRegionInfo metaRegionInfo, final boolean isInitialized) { + assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; + final ProcedureEvent metaInitEvent = getMetaInitializedEvent(metaRegionInfo); + if (isInitialized) { + getProcedureScheduler().wakeEvent(metaInitEvent); + } else { + getProcedureScheduler().suspendEvent(metaInitEvent); + } + } + + private ProcedureEvent getMetaInitializedEvent(final HRegionInfo metaRegionInfo) { + assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; + // TODO: handle multiple meta. + return metaInitializedEvent; + } + + public boolean waitMetaLoaded(final Procedure proc) { + return getProcedureScheduler().waitEvent(metaLoadEvent, proc); + } + + protected void wakeMetaLoadedEvent() { + getProcedureScheduler().wakeEvent(metaLoadEvent); + assert isMetaLoaded() : "expected meta to be loaded"; + } + + public boolean isMetaLoaded() { + return metaLoadEvent.isReady(); + } + + // ============================================================================================ + // TODO: Sync helpers + // ============================================================================================ + public void assignMeta(final HRegionInfo metaRegionInfo) throws IOException { + assignMeta(metaRegionInfo, null); + } + + public void assignMeta(final HRegionInfo metaRegionInfo, final ServerName serverName) + throws IOException { + assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; + AssignProcedure proc; + if (serverName != null) { + LOG.debug("Try assigning Meta " + metaRegionInfo + " to " + serverName); + proc = createAssignProcedure(metaRegionInfo, serverName); + } else { + LOG.debug("Assigning " + metaRegionInfo.getRegionNameAsString()); + proc = createAssignProcedure(metaRegionInfo, false); + } + ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); + } + + public void assign(final HRegionInfo regionInfo) throws IOException { + assign(regionInfo, true); + } + + public void assign(final HRegionInfo regionInfo, final boolean forceNewPlan) throws IOException { + AssignProcedure proc = createAssignProcedure(regionInfo, forceNewPlan); + ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); + } + + public void unassign(final HRegionInfo regionInfo) throws IOException { + unassign(regionInfo, false); + } + + public void unassign(final HRegionInfo regionInfo, final boolean forceNewPlan) + throws IOException { + // TODO: rename this reassign + RegionStateNode node = this.regionStates.getRegionNode(regionInfo); + ServerName destinationServer = node.getRegionLocation(); + if (destinationServer == null) { + throw new UnexpectedStateException("DestinationServer is null; Assigned? " + node.toString()); + } + assert destinationServer != null; node.toString(); + UnassignProcedure proc = createUnassignProcedure(regionInfo, destinationServer, forceNewPlan); + ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); + } + + public Future<byte[]> moveAsync(final RegionPlan regionPlan) { + MoveRegionProcedure proc = createMoveRegionProcedure(regionPlan); + return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); + } + + @VisibleForTesting + public boolean waitForAssignment(final HRegionInfo regionInfo) throws IOException { + return waitForAssignment(regionInfo, Long.MAX_VALUE); + } + + @VisibleForTesting + // TODO: Remove this? + public boolean waitForAssignment(final HRegionInfo regionInfo, final long timeout) + throws IOException { + RegionStateNode node = null; + // This method can be called before the regionInfo has made it into the regionStateMap + // so wait around here a while. + long startTime = System.currentTimeMillis(); + // Something badly wrong if takes ten seconds to register a region. + long endTime = startTime + 10000; + while ((node = regionStates.getRegionNode(regionInfo)) == null && isRunning() && + System.currentTimeMillis() < endTime) { + // Presume it not yet added but will be added soon. Let it spew a lot so we can tell if + // we are waiting here alot. + LOG.debug("Waiting on " + regionInfo + " to be added to regionStateMap"); + Threads.sleep(10); + } + if (node == null) { + if (!isRunning()) return false; + throw new RegionException(regionInfo.getRegionNameAsString() + " never registered with Assigment."); + } + + RegionTransitionProcedure proc = node.getProcedure(); + if (proc == null) { + throw new NoSuchProcedureException(node.toString()); + } + + ProcedureSyncWait.waitForProcedureToCompleteIOE( + master.getMasterProcedureExecutor(), proc.getProcId(), timeout); + return true; + } + + // ============================================================================================ + // RegionTransition procedures helpers + // ============================================================================================ + + public AssignProcedure[] createAssignProcedures(final Collection<HRegionInfo> regionInfo) { + return createAssignProcedures(regionInfo, false); + } + + public AssignProcedure[] createAssignProcedures(final Collection<HRegionInfo> regionInfo, + final boolean forceNewPlan) { + if (regionInfo.isEmpty()) return null; + final AssignProcedure[] procs = new AssignProcedure[regionInfo.size()]; + int index = 0; + for (HRegionInfo hri: regionInfo) { + procs[index++] = createAssignProcedure(hri, forceNewPlan); + } + return procs; + } + + // Needed for the following method so it can type the created Array we return + private static final UnassignProcedure [] UNASSIGNED_PROCEDURE_FOR_TYPE_INFO = + new UnassignProcedure[0]; + + UnassignProcedure[] createUnassignProcedures(final Collection<RegionStateNode> nodes) { + if (nodes.isEmpty()) return null; + final List<UnassignProcedure> procs = new ArrayList<UnassignProcedure>(nodes.size()); + for (RegionStateNode node: nodes) { + if (!this.regionStates.include(node, false)) continue; + // Look for regions that are offline/closed; i.e. already unassigned. + if (this.regionStates.isRegionOffline(node.getRegionInfo())) continue; + assert node.getRegionLocation() != null: node.toString(); + procs.add(createUnassignProcedure(node.getRegionInfo(), node.getRegionLocation(), false)); + } + return procs.toArray(UNASSIGNED_PROCEDURE_FOR_TYPE_INFO); + } + + public MoveRegionProcedure[] createReopenProcedures(final Collection<HRegionInfo> regionInfo) { + final MoveRegionProcedure[] procs = new MoveRegionProcedure[regionInfo.size()]; + int index = 0; + for (HRegionInfo hri: regionInfo) { + final ServerName serverName = regionStates.getRegionServerOfRegion(hri); + final RegionPlan plan = new RegionPlan(hri, serverName, serverName); + procs[index++] = createMoveRegionProcedure(plan); + } + return procs; + } + + /** + * Called by things like EnableTableProcedure to get a list of AssignProcedure + * to assign the regions of the table. + */ + public AssignProcedure[] createAssignProcedures(final TableName tableName) { + return createAssignProcedures(regionStates.getRegionsOfTable(tableName)); + } + + /** + * Called by things like DisableTableProcedure to get a list of UnassignProcedure + * to unassign the regions of the table. + */ + public UnassignProcedure[] createUnassignProcedures(final TableName tableName) { + return createUnassignProcedures(regionStates.getTableRegionStateNodes(tableName)); + } + + /** + * Called by things like ModifyColumnFamilyProcedure to get a list of MoveRegionProcedure + * to reopen the regions of the table. + */ + public MoveRegionProcedure[] createReopenProcedures(final TableName tableName) { + return createReopenProcedures(regionStates.getRegionsOfTable(tableName)); + } + + public AssignProcedure createAssignProcedure(final HRegionInfo regionInfo, + final boolean forceNewPlan) { + AssignProcedure proc = new AssignProcedure(regionInfo, forceNewPlan); + proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName()); + return proc; + } + + public AssignProcedure createAssignProcedure(final HRegionInfo regionInfo, + final ServerName targetServer) { + AssignProcedure proc = new AssignProcedure(regionInfo, targetServer); + proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName()); + return proc; + } + + public UnassignProcedure createUnassignProcedure(final HRegionInfo regionInfo, + final ServerName destinationServer, final boolean force) { + // If destinationServer is null, figure it. + ServerName sn = destinationServer != null? destinationServer: + getRegionStates().getRegionState(regionInfo).getServerName(); + assert sn != null; + UnassignProcedure proc = new UnassignProcedure(regionInfo, sn, force); + proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName()); + return proc; + } + + public MoveRegionProcedure createMoveRegionProcedure(final RegionPlan plan) { + MoveRegionProcedure proc = new MoveRegionProcedure(plan); + proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName()); + return proc; + } + + + public SplitTableRegionProcedure createSplitProcedure(final HRegionInfo regionToSplit, + final byte[] splitKey) throws IOException { + return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey); + } + + public MergeTableRegionsProcedure createMergeProcedure(final HRegionInfo regionToMergeA, + final HRegionInfo regionToMergeB) throws IOException { + return new MergeTableRegionsProcedure(getProcedureEnvironment(), regionToMergeA,regionToMergeB); + } + + /** + * Delete the region states. This is called by "DeleteTable" + */ + public void deleteTable(final TableName tableName) throws IOException { + final ArrayList<HRegionInfo> regions = regionStates.getTableRegionsInfo(tableName); + regionStateStore.deleteRegions(regions); + for (int i = 0; i < regions.size(); ++i) { + final HRegionInfo regionInfo = regions.get(i); + // we expect the region to be offline + regionStates.removeFromOfflineRegions(regionInfo); + regionStates.deleteRegion(regionInfo); + } + } + + // ============================================================================================ + // RS Region Transition Report helpers + // ============================================================================================ + // TODO: Move this code in MasterRpcServices and call on specific event? + public ReportRegionStateTransitionResponse reportRegionStateTransition( + final ReportRegionStateTransitionRequest req) + throws PleaseHoldException { + final ReportRegionStateTransitionResponse.Builder builder = + ReportRegionStateTransitionResponse.newBuilder(); + final ServerName serverName = ProtobufUtil.toServerName(req.getServer()); + try { + for (RegionStateTransition transition: req.getTransitionList()) { + switch (transition.getTransitionCode()) { + case OPENED: + case FAILED_OPEN: + case CLOSED: + assert transition.getRegionInfoCount() == 1 : transition; + final HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0)); + updateRegionTransition(serverName, transition.getTransitionCode(), hri, + transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM); + break; + case READY_TO_SPLIT: + case SPLIT_PONR: + case SPLIT: + case SPLIT_REVERTED: + assert transition.getRegionInfoCount() == 3 : transition; + final HRegionInfo parent = HRegionInfo.convert(transition.getRegionInfo(0)); + final HRegionInfo splitA = HRegionInfo.convert(transition.getRegionInfo(1)); + final HRegionInfo splitB = HRegionInfo.convert(transition.getRegionInfo(2)); + updateRegionSplitTransition(serverName, transition.getTransitionCode(), + parent, splitA, splitB); + break; + case READY_TO_MERGE: + case MERGE_PONR: + case MERGED: + case MERGE_REVERTED: + assert transition.getRegionInfoCount() == 3 : transition; + final HRegionInfo merged = HRegionInfo.convert(transition.getRegionInfo(0)); + final HRegionInfo mergeA = HRegionInfo.convert(transition.getRegionInfo(1)); + final HRegionInfo mergeB = HRegionInfo.convert(transition.getRegionInfo(2)); + updateRegionMergeTransition(serverName, transition.getTransitionCode(), + merged, mergeA, mergeB); + break; + } + } + } catch (PleaseHoldException e) { + LOG.debug("failed to transition: " + e.getMessage()); + throw e; + } catch (UnsupportedOperationException|IOException e) { + // TODO: at the moment we have a single error message and the RS will abort + // if the master says that one of the region transition failed. + LOG.warn("failed to transition: " + e.getMessage()); + builder.setErrorMessage("failed to transition: " + e.getMessage()); + } + return builder.build(); + } + + private void updateRegionTransition(final ServerName serverName, final TransitionCode state, + final HRegionInfo regionInfo, final long seqId) + throws PleaseHoldException, UnexpectedStateException { + checkFailoverCleanupCompleted(regionInfo); + + final RegionStateNode regionNode = regionStates.getRegionNode(regionInfo); + if (regionNode == null) { + // the table/region is gone. maybe a delete, split, merge + throw new UnexpectedStateException(String.format( + "Server %s was trying to transition region %s to %s. but the region was removed.", + serverName, regionInfo, state)); + } + + if (LOG.isTraceEnabled()) { + LOG.trace(String.format("Update region transition serverName=%s region=%s state=%s", + serverName, regionNode, state)); + } + + final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); + if (!reportTransition(regionNode, serverNode, state, seqId)) { + LOG.warn(String.format( + "No procedure for %s. server=%s to transition to %s", regionNode, serverName, state)); + } + } + + private boolean reportTransition(final RegionStateNode regionNode, + final ServerStateNode serverNode, final TransitionCode state, final long seqId) + throws UnexpectedStateException { + final ServerName serverName = serverNode.getServerName(); + synchronized (regionNode) { + final RegionTransitionProcedure proc = regionNode.getProcedure(); + if (proc == null) return false; + + //serverNode.getReportEvent().removeProcedure(proc); + proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), + serverName, state, seqId); + return true; + } + } + + private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state, + final HRegionInfo parent, final HRegionInfo hriA, final HRegionInfo hriB) + throws IOException { + checkFailoverCleanupCompleted(parent); + + if (state != TransitionCode.READY_TO_SPLIT) { + throw new UnexpectedStateException("unsupported split state=" + state + + " for parent region " + parent + + " maybe an old RS (< 2.0) had the operation in progress"); + } + + // sanity check on the request + if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) { + throw new UnsupportedOperationException( + "unsupported split request with bad keys: parent=" + parent + + " hriA=" + hriA + " hriB=" + hriB); + } + + try { + if (regionStateListener != null) { + regionStateListener.onRegionSplit(parent); + } + } catch (QuotaExceededException e) { + // TODO: does this really belong here? + master.getRegionNormalizer().planSkipped(parent, PlanType.SPLIT); + throw e; + } + + // Submit the Split procedure + final byte[] splitKey = hriB.getStartKey(); + if (LOG.isDebugEnabled()) { + LOG.debug("Split request from " + serverName + + ", parent=" + parent + " splitKey=" + Bytes.toStringBinary(splitKey)); + } + master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey)); + + // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split + if (regionStates.getOrCreateServer(serverName).getVersionNumber() < 0x0200000) { + throw new UnsupportedOperationException(String.format( + "Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB)); + } + } + + private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state, + final HRegionInfo merged, final HRegionInfo hriA, final HRegionInfo hriB) + throws PleaseHoldException, UnexpectedStateException, IOException { + checkFailoverCleanupCompleted(merged); + + if (state != TransitionCode.READY_TO_MERGE) { + throw new UnexpectedStateException("Unsupported merge state=" + state + + " for regionA=" + hriA + " regionB=" + hriB + " merged=" + merged + + " maybe an old RS (< 2.0) had the operation in progress"); + } + + // Submit the Merge procedure + if (LOG.isDebugEnabled()) { + LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged); + } + master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB)); + + // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge + if (regionStates.getOrCreateServer(serverName).getVersionNumber() < 0x0200000) { + throw new UnsupportedOperationException(String.format( + "Merge not handled yet: state=%s merged=%s hriA=%s hriB=%s", state, merged, hriA, hriB)); + } + } + + // ============================================================================================ + // RS Status update (report online regions) helpers + // ============================================================================================ + /** + * the master will call this method when the RS send the regionServerReport(). + * the report will contains the "hbase version" and the "online regions". + * this method will check the the online regions against the in-memory state of the AM, + * if there is a mismatch we will try to fence out the RS with the assumption + * that something went wrong on the RS side. + */ + public void reportOnlineRegions(final ServerName serverName, + final int versionNumber, final Set<byte[]> regionNames) { + if (!isRunning()) return; + if (LOG.isTraceEnabled()) { + LOG.trace("ReportOnlineRegions " + serverName + " regionCount=" + regionNames.size() + + ", metaLoaded=" + isMetaLoaded() + " " + + regionNames.stream().map(element -> Bytes.toStringBinary(element)). + collect(Collectors.toList())); + } + + final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); + + // update the server version number. This will be used for live upgrades. + synchronized (serverNode) { + serverNode.setVersionNumber(versionNumber); + if (serverNode.isInState(ServerState.SPLITTING, ServerState.OFFLINE)) { + LOG.warn("Got a report from a server result in state " + serverNode.getState()); + return; + } + } + + if (regionNames.isEmpty()) { + // nothing to do if we don't have regions + LOG.trace("no online region found on " + serverName); + } else if (!isMetaLoaded()) { + // if we are still on startup, discard the report unless is from someone holding meta + checkOnlineRegionsReportForMeta(serverNode, regionNames); + } else { + // The Heartbeat updates us of what regions are only. check and verify the state. + checkOnlineRegionsReport(serverNode, regionNames); + } + + // wake report event + wakeServerReportEvent(serverNode); + } + + public void checkOnlineRegionsReportForMeta(final ServerStateNode serverNode, + final Set<byte[]> regionNames) { + try { + for (byte[] regionName: regionNames) { + final HRegionInfo hri = getMetaRegionFromName(regionName); + if (hri == null) { + if (LOG.isTraceEnabled()) { + LOG.trace("Skip online report for region=" + Bytes.toStringBinary(regionName) + + " while meta is loading"); + } + continue; + } + + final RegionStateNode regionNode = regionStates.getOrCreateRegionNode(hri); + LOG.info("META REPORTED: " + regionNode); + if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) { + LOG.warn("META REPORTED but no procedure found"); + regionNode.setRegionLocation(serverNode.getServerName()); + } else if (LOG.isTraceEnabled()) { + LOG.trace("META REPORTED: " + regionNode); + } + } + } catch (UnexpectedStateException e) { + final ServerName serverName = serverNode.getServerName(); + LOG.warn("Killing " + serverName + ": " + e.getMessage()); + killRegionServer(serverNode); + } + } + + public void checkOnlineRegionsReport(final ServerStateNode serverNode, + final Set<byte[]> regionNames) { + final ServerName serverName = serverNode.getServerName(); + try { + for (byte[] regionName: regionNames) { + if (!isRunning()) return; + + final RegionStateNode regionNode = regionStates.getRegionNodeFromName(regionName); + if (regionNode == null) { + throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName)); + } + + synchronized (regionNode) { + if (regionNode.isInState(State.OPENING, State.OPEN)) { + if (!regionNode.getRegionLocation().equals(serverName)) { + throw new UnexpectedStateException( + "Reported OPEN on server=" + serverName + + " but state found says server=" + regionNode.getRegionLocation()); + } else if (regionNode.isInState(State.OPENING)) { + try { + if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) { + LOG.warn("Reported OPEN on server=" + serverName + + " but state found says " + regionNode + " and NO procedure is running"); + } + } catch (UnexpectedStateException e) { + LOG.warn("Unexpected exception while trying to report " + regionNode + + " as open: " + e.getMessage(), e); + } + } + } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) { + // TODO: We end up killing the RS if we get a report while we already + // transitioned to close or split. we should have a timeout/timestamp to compare + throw new UnexpectedStateException( + "Reported OPEN but state found says " + regionNode.getState()); + } + } + } + } catch (UnexpectedStateException e) { + LOG.warn("Killing " + serverName + ": " + e.getMessage()); + killRegionServer(serverNode); + } + } + + protected boolean waitServerReportEvent(final ServerName serverName, final Procedure proc) { + final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); + return getProcedureScheduler().waitEvent(serverNode.getReportEvent(), proc); + } + + protected void wakeServerReportEvent(final ServerStateNode serverNode) { + getProcedureScheduler().wakeEvent(serverNode.getReportEvent()); + } + + // ============================================================================================ + // RIT chore + // ============================================================================================ + private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> { + public RegionInTransitionChore(final int timeoutMsec) { + super(timeoutMsec); + } + + @Override + protected void periodicExecute(final MasterProcedureEnv env) { + final AssignmentManager am = env.getAssignmentManager(); + + final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat(); + if (ritStat.hasRegionsOverThreshold()) { + for (RegionState hri: ritStat.getRegionOverThreshold()) { + am.handleRegionOverStuckWarningThreshold(hri.getRegion()); + } + } + + // update metrics + am.updateRegionsInTransitionMetrics(ritStat); + } + } + + public RegionInTransitionStat computeRegionInTransitionStat() { + final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration()); + rit.update(this); + return rit; + } + + public static class RegionInTransitionStat { + private final int ritThreshold; + + private HashMap<String, RegionState> ritsOverThreshold = null; + private long statTimestamp; + private long oldestRITTime = 0; + private int totalRITsTwiceThreshold = 0; + private int totalRITs = 0; + + @VisibleForTesting + public RegionInTransitionStat(final Configuration conf) { + this.ritThreshold = + conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD); + } + + public int getRITThreshold() { + return ritThreshold; + } + + public long getTimestamp() { + return statTimestamp; + } + + public int getTotalRITs() { + return totalRITs; + } + + public long getOldestRITTime() { + return oldestRITTime; + } + + public int getTotalRITsOverThreshold() { + Map<String, RegionState> m = this.ritsOverThreshold; + return m != null ? m.size() : 0; + } + + public boolean hasRegionsTwiceOverThreshold() { + return totalRITsTwiceThreshold > 0; + } + + public boolean hasRegionsOverThreshold() { + Map<String, RegionState> m = this.ritsOverThreshold; + return m != null && !m.isEmpty(); + } + + public Collection<RegionState> getRegionOverThreshold() { + Map<String, RegionState> m = this.ritsOverThreshold; + return m != null? m.values(): Collections.EMPTY_SET; + } + + public boolean isRegionOverThreshold(final HRegionInfo regionInfo) { + Map<String, RegionState> m = this.ritsOverThreshold; + return m != null? m.containsKey(regionInfo.getEncodedName()): false; + } + + public boolean isRegionTwiceOverThreshold(final HRegionInfo regionInfo) { + Map<String, RegionState> m = this.ritsOverThreshold; + if (m == null) return false; + final RegionState state = m.get(regionInfo.getEncodedName()); + if (state == null) return false; + return (statTimestamp - state.getStamp()) > (ritThreshold * 2); + } + + protected void update(final AssignmentManager am) { + final RegionStates regionStates = am.getRegionStates(); + this.statTimestamp = EnvironmentEdgeManager.currentTime(); + update(regionStates.getRegionsStateInTransition(), statTimestamp); + update(regionStates.getRegionFailedOpen(), statTimestamp); + } + + private void update(final Collection<RegionState> regions, final long currentTime) { + for (RegionState state: regions) { + totalRITs++; + final long ritTime = currentTime - state.getStamp(); + if (ritTime > ritThreshold) { + if (ritsOverThreshold == null) { + ritsOverThreshold = new HashMap<String, RegionState>(); + } + ritsOverThreshold.put(state.getRegion().getEncodedName(), state); + totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0; + } + if (oldestRITTime < ritTime) { + oldestRITTime = ritTime; + } + } + } + } + + private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) { + metrics.updateRITOldestAge(ritStat.getOldestRITTime()); + metrics.updateRITCount(ritStat.getTotalRITs()); + metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold()); + } + + private void handleRegionOverStuckWarningThreshold(final HRegionInfo regionInfo) { + final RegionStateNode regionNode = regionStates.getRegionNode(regionInfo); + //if (regionNode.isStuck()) { + LOG.warn("TODO Handle stuck in transition: " + regionNode); + } + + // ============================================================================================ + // TODO: Master load/bootstrap + // ============================================================================================ + public void joinCluster() throws IOException { + final long startTime = System.currentTimeMillis(); + + LOG.info("Joining the cluster..."); + + // Scan hbase:meta to build list of existing regions, servers, and assignment + loadMeta(); + + for (int i = 0; master.getServerManager().countOfRegionServers() < 1; ++i) { + LOG.info("waiting for RS to join"); + Threads.sleep(250); + } + LOG.info("RS joined " + master.getServerManager().countOfRegionServers()); + + // This method will assign all user regions if a clean server startup or + // it will reconstruct master state and cleanup any leftovers from previous master process. + boolean failover = processofflineServersWithOnlineRegions(); + + // Start the RIT chore + master.getMasterProcedureExecutor().addChore(this.ritChore); + + LOG.info(String.format("Joined the cluster in %s, failover=%s", + StringUtils.humanTimeDiff(System.currentTimeMillis() - startTime), failover)); + } + + private void loadMeta() throws IOException { + // TODO: use a thread pool + regionStateStore.visitMeta(new RegionStateStore.RegionStateVisitor() { + @Override + public void visitRegionState(final HRegionInfo regionInfo, final State state, + final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) { + final RegionStateNode regionNode = regionStates.getOrCreateRegionNode(regionInfo); + synchronized (regionNode) { + if (!regionNode.isInTransition()) { + regionNode.setState(state); + regionNode.setLastHost(lastHost); + regionNode.setRegionLocation(regionLocation); + regionNode.setOpenSeqNum(openSeqNum); + + if (state == State.OPEN) { + assert regionLocation != null : "found null region location for " + regionNode; + regionStates.addRegionToServer(regionLocation, regionNode); + } else if (state == State.OFFLINE || regionInfo.isOffline()) { + regionStates.addToOfflineRegions(regionNode); + } else { + // These regions should have a procedure in replay + regionStates.addRegionInTransition(regionNode, null); + } + } + } + } + }); + + // every assignment is blocked until meta is loaded. + wakeMetaLoadedEvent(); + } + + // TODO: the assumption here is that if RSs are crashing while we are executing this + // they will be handled by the SSH that will be putted in the ServerManager "queue". + // we can integrate this a bit better. + private boolean processofflineServersWithOnlineRegions() { + boolean failover = !master.getServerManager().getDeadServers().isEmpty(); + + final Set<ServerName> offlineServersWithOnlineRegions = new HashSet<ServerName>(); + final ArrayList<HRegionInfo> regionsToAssign = new ArrayList<HRegionInfo>(); + long st, et; + + st = System.currentTimeMillis(); + for (RegionStateNode regionNode: regionStates.getRegionNodes()) { + if (regionNode.getState() == State.OPEN) { + final ServerName serverName = regionNode.getRegionLocation(); + if (!master.getServerManager().isServerOnline(serverName)) { + offlineServersWithOnlineRegions.add(serverName); + } + } else if (regionNode.getState() == State.OFFLINE) { + if (isTableEnabled(regionNode.getTable())) { + regionsToAssign.add(regionNode.getRegionInfo()); + } + } + } + et = System.currentTimeMillis(); + LOG.info("[STEP-1] " + StringUtils.humanTimeDiff(et - st)); + + // kill servers with online regions + st = System.currentTimeMillis(); + for (ServerName serverName: offlineServersWithOnlineRegions) { + if (!master.getServerManager().isServerOnline(serverName)) { + LOG.info("KILL RS hosting regions but not online " + serverName + + " (master=" + master.getServerName() + ")"); + killRegionServer(serverName); + } + } + et = System.currentTimeMillis(); + LOG.info("[STEP-2] " + StringUtils.humanTimeDiff(et - st)); + + setFailoverCleanupDone(true); + + // assign offline regions + st = System.currentTimeMillis(); + for (HRegionInfo regionInfo: regionsToAssign) { + master.getMasterProcedureExecutor().submitProcedure( + createAssignProcedure(regionInfo, false)); + } + et = System.currentTimeMillis(); + LOG.info("[STEP-3] " + StringUtils.humanTimeDiff(et - st)); + + return failover; + } + + /** + * Used by ServerCrashProcedure to make sure AssignmentManager has completed + * the failover cleanup before re-assigning regions of dead servers. So that + * when re-assignment happens, AssignmentManager has proper region states. + */ + public boolean isFailoverCleanupDone() { + return failoverCleanupDone.isReady(); + } + + /** + * Used by ServerCrashProcedure tests verify the ability to suspend the + * execution of the ServerCrashProcedure. + */ + @VisibleForTesting + public void setFailoverCleanupDone(final boolean b) { + master.getMasterProcedureExecutor().getEnvironment() + .setEventReady(failoverCleanupDone, b); + } + + public ProcedureEvent getFailoverCleanupEvent() { + return failoverCleanupDone; + } + + /** + * Used to check if the failover cleanup is done. + * if not we throw PleaseHoldException since we are rebuilding the RegionStates + * @param hri region to check if it is already rebuild + * @throws PleaseHoldException if the failover cleanup is not completed + */ + private void checkFailoverCleanupCompleted(final HRegionInfo hri) throws PleaseHoldException { + if (!isRunning()) { + throw new PleaseHoldException("AssignmentManager not running"); + } + + // TODO: can we avoid throwing an exception if hri is already loaded? + // at the moment we bypass only meta + boolean meta = isMetaRegion(hri); + boolean cleanup = isFailoverCleanupDone(); + if (!isMetaRegion(hri) && !isFailoverCleanupDone()) { + String msg = "Master not fully online; hbase:meta=" + meta + ", failoverCleanup=" + cleanup; + throw new PleaseHoldException(msg); + } + } + + // ============================================================================================ + // TODO: Metrics + // ============================================================================================ + public int getNumRegionsOpened() { + // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value + return 0; + } + + public void submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) { + boolean carryingMeta = master.getAssignmentManager().isCarryingMeta(serverName); + ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor(); + procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName, + shouldSplitWal, carryingMeta)); + LOG.debug("Added=" + serverName + + " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta); + } + + public void offlineRegion(final HRegionInfo regionInfo) throws IOException { + // TODO used by MasterRpcServices ServerCrashProcedure + final RegionStateNode node = regionStates.getRegionNode(regionInfo); + if (node != null) node.offline(); + } + + public void onlineRegion(final HRegionInfo regionInfo, final ServerName serverName) { + // TODO used by TestSplitTransactionOnCluster.java + } + + public Map<ServerName, List<HRegionInfo>> getSnapShotOfAssignment( + final Collection<HRegionInfo> regions) { + return regionStates.getSnapShotOfAssignment(regions); + } + + // ============================================================================================ + // TODO: UTILS/HELPERS? + // ============================================================================================ + /** + * Used by the client (via master) to identify if all regions have the schema updates + * + * @param tableName + * @return Pair indicating the status of the alter command (pending/total) + * @throws IOException + */ + public Pair<Integer, Integer> getReopenStatus(TableName tableName) + throws IOException { + if (isTableDisabled(tableName)) return new Pair<Integer, Integer>(0, 0); + + final List<RegionState> states = regionStates.getTableRegionStates(tableName); + int ritCount = 0; + for (RegionState regionState: states) { + if (!regionState.isOpened()) ritCount++; + } + return new Pair<Integer, Integer>(ritCount, states.size()); + } + + // ============================================================================================ + // TODO: Region State In Transition + // ============================================================================================ + protected boolean addRegionInTransition(final RegionStateNode regionNode, + final RegionTransitionProcedure procedure) { + return regionStates.addRegionInTransition(regionNode, procedure); + } + + protected void removeRegionInTransition(final RegionStateNode regionNode, + final RegionTransitionProcedure procedure) { + regionStates.removeRegionInTransition(regionNode, procedure); + } + + public boolean hasRegionsInTransition() { + return regionStates.hasRegionsInTransition(); + } + + public List<RegionStateNode> getRegionsInTransition() { + return regionStates.getRegionsInTransition(); + } + + public List<HRegionInfo> getAssignedRegions() { + return regionStates.getAssignedRegions(); + } + + public HRegionInfo getRegionInfo(final byte[] regionName) { + final RegionStateNode regionState = regionStates.getRegionNodeFromName(regionName); + return regionState != null ? regionState.getRegionInfo() : null; + } + + // ============================================================================================ + // TODO: Region Status update + // ============================================================================================ + private void sendRegionOpenedNotification(final HRegionInfo regionInfo, + final ServerName serverName) { + getBalancer().regionOnline(regionInfo, serverName); + if (!this.listeners.isEmpty()) { + for (AssignmentListener listener : this.listeners) { + listener.regionOpened(regionInfo, serverName); + } + } + } + + private void sendRegionClosedNotification(final HRegionInfo regionInfo) { + getBalancer().regionOffline(regionInfo); + if (!this.listeners.isEmpty()) { + for (AssignmentListener listener : this.listeners) { + listener.regionClosed(regionInfo); + } + } + } + + public void markRegionAsOpening(final RegionStateNode regionNode) throws IOException { + synchronized (regionNode) { + State state = regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN); + regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode); + regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state, + regionNode.getRegionLocation(), regionNode.getLastHost(), HConstants.NO_SEQNUM, + regionNode.getProcedure().getProcId()); + } + + // update the operation count metrics + metrics.incrementOperationCounter(); + } + + public void markRegionAsOpened(final RegionStateNode regionNode) throws IOException { + final HRegionInfo hri = regionNode.getRegionInfo(); + synchronized (regionNode) { + State state = regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN); + if (isMetaRegion(hri)) { + setMetaInitialized(hri, true); + } + regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode); + // TODO: OPENING Updates hbase:meta too... we need to do both here and there? + // That is a lot of hbase:meta writing. + regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state, + regionNode.getRegionLocation(), regionNode.getLastHost(), regionNode.getOpenSeqNum(), + regionNode.getProcedure().getProcId()); + sendRegionOpenedNotification(hri, regionNode.getRegionLocation()); + // update assignment metrics + if (regionNode.getProcedure() != null) { + metrics.updateAssignTime(regionNode.getProcedure().elapsedTime()); + } + } + } + + public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException { + final HRegionInfo hri = regionNode.getRegionInfo(); + synchronized (regionNode) { + State state = regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE); + // Set meta has not initialized early. so people trying to create/edit tables will wait + if (isMetaRegion(hri)) { + setMetaInitialized(hri, false); + } + regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode); + regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state, + regionNode.getRegionLocation(), regionNode.getLastHost(), HConstants.NO_SEQNUM, + regionNode.getProcedure().getProcId()); + } + + // update the operation count metrics + metrics.incrementOperationCounter(); + } + + public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException { + final HRegionInfo hri = regionNode.getRegionInfo(); + synchronized (regionNode) { + State state = regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE); + regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode); + regionNode.setLastHost(regionNode.getRegionLocation()); + regionNode.setRegionLocation(null); + regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state, + regionNode.getRegionLocation()/*null*/, regionNode.getLastHost(), + HConstants.NO_SEQNUM, regionNode.getProcedure().getProcId()); + sendRegionClosedNotification(hri); + // Update assignment metrics + if (regionNode.getProcedure() != null) { + metrics.updateUnassignTime(regionNode.getProcedure().elapsedTime()); + } + } + } + + public void markRegionAsSplit(final HRegionInfo parent, final ServerName serverName, + final HRegionInfo daughterA, final HRegionInfo daughterB) + throws IOException { + // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. + // The parent stays in regionStates until cleared when removed by CatalogJanitor. + // Update its state in regionStates to it shows as offline and split when read + // later figuring what regions are in a table and what are not: see + // regionStates#getRegionsOfTable + final RegionStateNode node = regionStates.getOrCreateRegionNode(parent); + node.setState(State.SPLIT); + regionStateStore.splitRegion(parent, daughterA, daughterB, serverName); + if (shouldAssignFavoredNodes(parent)) { + List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList(); + ((FavoredNodesPromoter)getBalancer()). + generateFavoredNodesForDaughter(onlineServers, parent, daughterA, daughterB); + } + } + + /** + * When called here, the merge has happened. The two merged regions have been + * unassigned and the above markRegionClosed has been called on each so they have been + * disassociated from a hosting Server. The merged region will be open after this call. The + * merged regions are removed from hbase:meta below> Later they are deleted from the filesystem + * by the catalog janitor running against hbase:meta. It notices when the merged region no + * longer holds references to the old regions. + */ + public void markRegionAsMerged(final HRegionInfo child, final ServerName serverName, + final HRegionInfo mother, final HRegionInfo father) throws IOException { + final RegionStateNode node = regionStates.getOrCreateRegionNode(child); + node.setState(State.MERGED); + regionStates.deleteRegion(mother); + regionStates.deleteRegion(father); + regionStateStore.mergeRegions(child, mother, father, serverName); + if (shouldAssignFavoredNodes(child)) { + ((FavoredNodesPromoter)getBalancer()). + generateFavoredNodesForMergedRegion(child, mother, father); + } + } + + /* + * Favored nodes should be applied only when FavoredNodes balancer is configured and the region + * belongs to a non-system table. + */ + private boolean shouldAssignFavoredNodes(HRegionInfo region) { + return this.shouldAssignRegionsWithFavoredNodes && + FavoredNodesManager.isFavoredNodeApplicable(region); + } + + // ============================================================================================ + // Assign Queue (Assign/Balance) + // ============================================================================================ + private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>(); + private final ReentrantLock assignQueueLock = new ReentrantLock(); + private final Condition assignQueueFullCond = assignQueueLock.newCondition(); + + /** + * Add the assign operation to the assignment queue. + * The pending assignment operation will be processed, + * and each region will be assigned by a server using the balancer. + */ + protected void queueAssign(final RegionStateNode regionNode) { + getProcedureScheduler().suspendEvent(regionNode.getProcedureEvent()); + + // TODO: quick-start for meta and the other sys-tables? + assignQueueLock.lock(); + try { + pendingAssignQueue.add(regionNode); + if (regionNode.isSystemTable() || + pendingAssignQueue.size() == 1 || + pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) { + assignQueueFullCond.signal(); + } + } finally { + assignQueueLock.unlock(); + } + } + + private void startAssignmentThread() { + assignThread = new Thread("AssignmentThread") { + @Override + public void run() { + while (isRunning()) { + processAssignQueue(); + } + pendingAssignQueue.clear(); + } + }; + assignThread.start(); + } + + private void stopAssignmentThread() { + assignQueueSignal(); + try { + while (assignThread.isAlive()) { + assignQueueSignal(); + assignThread.join(250); + } + } catch (InterruptedException e) { + LOG.warn("join interrupted", e); + Thread.currentThread().interrupt(); + } + } + + private void assignQueueSignal() { + assignQueueLock.lock(); + try { + assignQueueFullCond.signal(); + } finally { + assignQueueLock.unlock(); + } + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") + private HashMap<HRegionInfo, RegionStateNode> waitOnAssignQueue() { + HashMap<HRegionInfo, RegionStateNode> regions = null; + + assignQueueLock.lock(); + try { + if (pendingAssignQueue.isEmpty() && isRunning()) { + assignQueueFullCond.await(); + } + + if (!isRunning()) return null; + assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS); + regions = new HashMap<HRegionInfo, RegionStateNode>(pendingAssignQueue.size()); + for (RegionStateNode regionNode: pendingAssignQueue) { + regions.put(regionNode.getRegionInfo(), regionNode); + } + pendingAssignQueue.clear(); + } catch (InterruptedException e) { + LOG.warn("got interrupted ", e); + Thread.currentThread().interrupt(); + } finally { + assignQueueLock.unlock(); + } + return regions; + } + + private void processAssignQueue() { + final HashMap<HRegionInfo, RegionStateNode> regions = waitOnAssignQueue(); + if (regions == null || regions.size() == 0 || !isRunning()) { + return; + } + + if (LOG.isTraceEnabled()) { + LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size()); + } + + // TODO: Optimize balancer. pass a RegionPlan? + final HashMap<HRegionInfo, ServerName> retainMap = new HashMap<HRegionInfo, ServerName>(); + final List<HRegionInfo> rrList = new ArrayList<HRegionInfo>(); + for (RegionStateNode regionNode: regions.values()) { + if (regionNode.getRegionLocation() != null) { + retainMap.put(regionNode.getRegionInfo(), regionNode.getRegionLocation()); + } else { + rrList.add(regionNode.getRegionInfo()); + } + } + + // TODO: connect with the listener to invalidate the cache + final LoadBalancer balancer = getBalancer(); + + // TODO use events + List<ServerName> servers = master.getServerManager().createDestinationServersList(); + for (int i = 0; servers.size() < 1; ++i) { + if (i % 4 == 0) { + LOG.warn("no server available, unable to find a location for " + regions.size() + + " unassigned regions. waiting"); + } + + // the was AM killed + if (!isRunning()) { + LOG.debug("aborting assignment-queue with " + regions.size() + " not assigned"); + return; + } + + Threads.sleep(250); + servers = master.getServerManager().createDestinationServersList(); + } + + final boolean isTraceEnabled = LOG.isTraceEnabled(); + if (isTraceEnabled) { + LOG.trace("available servers count=" + servers.size() + ": " + servers); + } + + // ask the balancer where to place regions + if (!retainMap.isEmpty()) { + if (isTraceEnabled) { + LOG.trace("retain assign regions=" + retainMap); + } + try { + acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); + } catch (HBaseIOException e) { + LOG.warn("unable to retain assignment", e); + addToPendingAssignment(regions, retainMap.keySet()); + } + } + + // TODO: Do we need to split retain and round-robin? + // the retain seems to fallback to round-robin/random if the region is not in the map. + if (!rrList.isEmpty()) { + Collections.sort(rrList); + if (isTraceEnabled) { + LOG.trace("round robin regions=" + rrList); + } + try { + acceptPlan(regions, balancer.roundRobinAssignment(rrList, servers)); + } catch (HBaseIOException e) { + LOG.warn("unable to round-robin assignment", e); + addToPendingAssignment(regions, rrList); + } + } + } + + private void acceptPlan(final HashMap<HRegionInfo, RegionStateNode> regions, + final Map<ServerName, List<HRegionInfo>> plan) throws HBaseIOException { + final ProcedureEvent[] events = new ProcedureEvent[regions.size()]; + final long st = System.currentTimeMillis(); + + if (plan == null) { + throw new HBaseIOException("unable to compute plans for regions=" + regions.size()); + } + + if (plan.isEmpty()) return; + + int evcount = 0; + for (Map.Entry<ServerName, List<HRegionInfo>> entry: plan.entrySet()) { + final ServerName server = entry.getKey(); + for (HRegionInfo hri: entry.getValue()) { + final RegionStateNode regionNode = regions.get(hri); + regionNode.setRegionLocation(server); + events[evcount++] = regionNode.getProcedureEvent(); + } + } + getProcedureScheduler().wakeEvents(evcount, events); + + final long et = System.currentTimeMillis(); + if (LOG.isTraceEnabled()) { + LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + + StringUtils.humanTimeDiff(et - st)); + } + } + + private void addToPendingAssignment(final HashMap<HRegionInfo, RegionStateNode> regions, + final Collection<HRegionInfo> pendingRegions) { + assignQueueLock.lock(); + try { + for (HRegionInfo hri: pendingRegions) { + pendingAssignQueue.add(regions.get(hri)); + } + } finally { + assignQueueLock.unlock(); + } + } + + // ============================================================================================ + // Server Helpers + // ============================================================================================ + @Override + public void serverAdded(final ServerName serverName) { + } + + @Override + public void serverRemoved(final ServerName serverName) { + final ServerStateNode serverNode = regionStates.getServerNode(serverName); + if (serverNode == null) return; + + // just in case, wake procedures waiting for this server report + wakeServerReportEvent(serverNode); + } + + public int getServerVersion(final ServerName serverName) { + final ServerStateNode node = regionStates.getServerNode(serverName); + return node != null ? node.getVersionNumber() : 0; + } + + public void killRegionServer(final ServerName serverName) { + final ServerStateNode serverNode = regionStates.getServerNode(serverName); + killRegionServer(serverNode); + } + + public void killRegionServer(final ServerStateNode serverNode) { + for (RegionStateNode regionNode: serverNode.getRegions()) { + regionNode.offline(); + } + master.getServerManager().expireServer(serverNode.getServerName()); + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/FailedRemoteDispatchException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/FailedRemoteDispatchException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/FailedRemoteDispatchException.java new file mode 100644 index 0000000..111b525 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/FailedRemoteDispatchException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.assignment; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Used internally signaling failed queue of a remote procedure + * operation. + */ +@SuppressWarnings("serial") +@InterfaceAudience.Private +public class FailedRemoteDispatchException extends HBaseIOException { + public FailedRemoteDispatchException(String msg) { + super(msg); + } +} \ No newline at end of file