http://git-wip-us.apache.org/repos/asf/hbase/blob/17dff681/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 4803227..c234767 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -36,7 +35,6 @@ import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -55,24 +53,16 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionLocations; -import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableStateManager; import org.apache.hadoop.hbase.client.RegionReplicaUtil; -import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; -import org.apache.hadoop.hbase.coordination.OpenRegionCoordination; -import org.apache.hadoop.hbase.coordination.RegionMergeCoordination; -import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails; -import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination; -import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination; -import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; @@ -82,10 +72,8 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer; -import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler; import org.apache.hadoop.hbase.master.handler.DisableTableHandler; import org.apache.hadoop.hbase.master.handler.EnableTableHandler; -import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; @@ -94,42 +82,25 @@ import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; -import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.util.Triple; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; -import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.ipc.RemoteException; -import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.data.Stat; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.LinkedHashMultimap; /** * Manages and performs region assignment. - * <p> - * Monitors ZooKeeper for events related to regions in transition. - * <p> - * Handles existing regions in transition during master failover. + * Related communications with regionserver are all done over RPC. */ @InterfaceAudience.Private -public class AssignmentManager extends ZooKeeperListener { +public class AssignmentManager { private static final Log LOG = LogFactory.getLog(AssignmentManager.class); - public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME, - -1, -1L); - static final String ALREADY_IN_TRANSITION_WAITTIME = "hbase.assignment.already.intransition.waittime"; static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute @@ -187,21 +158,9 @@ public class AssignmentManager extends ZooKeeperListener { private final ExecutorService executorService; - // For unit tests, keep track of calls to ClosedRegionHandler - private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null; - - // For unit tests, keep track of calls to OpenedRegionHandler - private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null; - - //Thread pool executor service for timeout monitor + // Thread pool executor service. TODO, consolidate with executorService? private java.util.concurrent.ExecutorService threadPoolExecutorService; - // A bunch of ZK events workers. Each is a single thread executor service - private final java.util.concurrent.ExecutorService zkEventWorkers; - - private List<EventType> ignoreStatesRSOffline = Arrays.asList( - EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED); - private final RegionStates regionStates; // The threshold to use bulk assigning. Using bulk assignment @@ -236,9 +195,6 @@ public class AssignmentManager extends ZooKeeperListener { private final ConcurrentHashMap<String, AtomicInteger> failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>(); - // A flag to indicate if we are using ZK for region assignment - private final boolean useZKForAssignment; - // In case not using ZK for region assignment, region states // are persisted in meta with a state store private final RegionStateStore regionStateStore; @@ -261,15 +217,14 @@ public class AssignmentManager extends ZooKeeperListener { * @param service Executor service * @param metricsMaster metrics manager * @param tableLockManager TableLock manager - * @throws KeeperException + * @throws CoordinatedStateException * @throws IOException */ public AssignmentManager(Server server, ServerManager serverManager, final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster, - final TableLockManager tableLockManager) throws KeeperException, - IOException, CoordinatedStateException { - super(server.getZooKeeper()); + final TableLockManager tableLockManager) + throws IOException, CoordinatedStateException { this.server = server; this.serverManager = serverManager; this.executorService = service; @@ -307,14 +262,8 @@ public class AssignmentManager extends ZooKeeperListener { this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7); this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3); - int workers = conf.getInt("hbase.assignment.zkevent.workers", 20); - ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker"); - zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L, - TimeUnit.SECONDS, threadFactory); - this.tableLockManager = tableLockManager; - this.metricsAssignmentManager = new MetricsAssignmentManager(); - useZKForAssignment = ConfigUtil.useZKForAssignment(conf); + this.tableLockManager = tableLockManager; } /** @@ -406,9 +355,9 @@ public class AssignmentManager extends ZooKeeperListener { */ public Pair<Integer, Integer> getReopenStatus(TableName tableName) throws IOException { - List <HRegionInfo> hris = - MetaTableAccessor.getTableRegions(this.watcher, this.server.getShortCircuitConnection(), - tableName, true); + List <HRegionInfo> hris = MetaTableAccessor.getTableRegions( + this.server.getZooKeeper(), this.server.getShortCircuitConnection(), + tableName, true); Integer pending = 0; for (HRegionInfo hri : hris) { String name = hri.getEncodedName(); @@ -476,10 +425,6 @@ public class AssignmentManager extends ZooKeeperListener { // previous master process. boolean failover = processDeadServersAndRegionsInTransition(deadServers); - if (!useZKForAssignment) { - // Not use ZK for assignment any more, remove the ZNode - ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode); - } recoverTableInDisablingState(); recoverTableInEnablingState(); LOG.info("Joined the cluster in " + (System.currentTimeMillis() @@ -493,22 +438,12 @@ public class AssignmentManager extends ZooKeeperListener { * startup, will assign all user regions. * @param deadServers * Map of dead servers and their regions. Can be null. - * @throws KeeperException * @throws IOException * @throws InterruptedException + * @throws CoordinatedStateException */ - boolean processDeadServersAndRegionsInTransition( - final Set<ServerName> deadServers) throws KeeperException, - IOException, InterruptedException, CoordinatedStateException { - List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, - watcher.assignmentZNode); - - if (useZKForAssignment && nodes == null) { - String errorMessage = "Failed to get the children from ZK"; - server.abort(errorMessage, new IOException(errorMessage)); - return true; // Doesn't matter in this case - } - + boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers) + throws IOException, InterruptedException, CoordinatedStateException { boolean failover = !serverManager.getDeadServers().isEmpty(); if (failover) { // This may not be a failover actually, especially if meta is on this master. @@ -517,36 +452,28 @@ public class AssignmentManager extends ZooKeeperListener { } } else { // If any one region except meta is assigned, it's a failover. - for (HRegionInfo hri: regionStates.getRegionAssignments().keySet()) { - if (!hri.isMetaTable()) { + Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet(); + for (Map.Entry<HRegionInfo, ServerName> en: + regionStates.getRegionAssignments().entrySet()) { + HRegionInfo hri = en.getKey(); + if (!hri.isMetaTable() + && onlineServers.contains(en.getValue())) { LOG.debug("Found " + hri + " out on cluster"); failover = true; break; } } - } - if (!failover && nodes != null) { - // If any one region except meta is in transition, it's a failover. - for (String encodedName: nodes) { - RegionState regionState = regionStates.getRegionState(encodedName); - if (regionState != null && !regionState.getRegion().isMetaRegion()) { - LOG.debug("Found " + regionState + " in RITs"); - failover = true; - break; - } - } - } - if (!failover && !useZKForAssignment) { - // If any region except meta is in transition on a live server, it's a failover. - Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition(); - if (!regionsInTransition.isEmpty()) { - Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet(); - for (RegionState regionState: regionsInTransition.values()) { - if (!regionState.getRegion().isMetaRegion() - && onlineServers.contains(regionState.getServerName())) { - LOG.debug("Found " + regionState + " in RITs"); - failover = true; - break; + if (!failover) { + // If any region except meta is in transition on a live server, it's a failover. + Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition(); + if (!regionsInTransition.isEmpty()) { + for (RegionState regionState: regionsInTransition.values()) { + if (!regionState.getRegion().isMetaRegion() + && onlineServers.contains(regionState.getServerName())) { + LOG.debug("Found " + regionState + " in RITs"); + failover = true; + break; + } } } } @@ -596,19 +523,8 @@ public class AssignmentManager extends ZooKeeperListener { // Now region states are restored regionStateStore.start(); - // If we found user regions out on cluster, its a failover. if (failover) { - LOG.info("Found regions out on cluster or in RIT; presuming failover"); - // Process list of dead servers and regions in RIT. - // See HBASE-4580 for more information. - processDeadServersAndRecoverLostRegions(deadServers); - } - - if (!failover && useZKForAssignment) { - // Cleanup any existing ZK nodes and start watching - ZKAssign.deleteAllNodes(watcher); - ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, - this.watcher.assignmentZNode); + processDeadServers(deadServers); } // Now we can safely claim failover cleanup completed and enable @@ -632,254 +548,6 @@ public class AssignmentManager extends ZooKeeperListener { } /** - * If region is up in zk in transition, then do fixup and block and wait until - * the region is assigned and out of transition. Used on startup for - * catalog regions. - * @param hri Region to look for. - * @return True if we processed a region in transition else false if region - * was not up in zk in transition. - * @throws InterruptedException - * @throws KeeperException - * @throws IOException - */ - boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri) - throws InterruptedException, KeeperException, IOException { - String encodedRegionName = hri.getEncodedName(); - if (!processRegionInTransition(encodedRegionName, hri)) { - return false; // The region is not in transition - } - LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName)); - while (!this.server.isStopped() && - this.regionStates.isRegionInTransition(encodedRegionName)) { - RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName); - if (state == null || !serverManager.isServerOnline(state.getServerName())) { - // The region is not in transition, or not in transition on an online - // server. Doesn't help to block here any more. Caller need to - // verify the region is actually assigned. - break; - } - this.regionStates.waitForUpdate(100); - } - return true; - } - - /** - * Process failover of new master for region <code>encodedRegionName</code> - * up in zookeeper. - * @param encodedRegionName Region to process failover for. - * @param regionInfo If null we'll go get it from meta table. - * @return True if we processed <code>regionInfo</code> as a RIT. - * @throws KeeperException - * @throws IOException - */ - boolean processRegionInTransition(final String encodedRegionName, - final HRegionInfo regionInfo) throws KeeperException, IOException { - // We need a lock here to ensure that we will not put the same region twice - // It has no reason to be a lock shared with the other operations. - // We can do the lock on the region only, instead of a global lock: what we want to ensure - // is that we don't have two threads working on the same region. - Lock lock = locker.acquireLock(encodedRegionName); - try { - Stat stat = new Stat(); - byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat); - if (data == null) return false; - RegionTransition rt; - try { - rt = RegionTransition.parseFrom(data); - } catch (DeserializationException e) { - LOG.warn("Failed parse znode data", e); - return false; - } - HRegionInfo hri = regionInfo; - if (hri == null) { - // The region info is not passed in. We will try to find the region - // from region states map/meta based on the encoded region name. But we - // may not be able to find it. This is valid for online merge that - // the region may have not been created if the merge is not completed. - // Therefore, it is not in meta at master recovery time. - hri = regionStates.getRegionInfo(rt.getRegionName()); - EventType et = rt.getEventType(); - if (hri == null && et != EventType.RS_ZK_REGION_MERGING - && et != EventType.RS_ZK_REQUEST_REGION_MERGE) { - LOG.warn("Couldn't find the region in recovering " + rt); - return false; - } - } - - // TODO: This code is tied to ZK anyway, so for now leaving it as is, - // will refactor when whole region assignment will be abstracted from ZK - BaseCoordinatedStateManager cp = - (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager(); - OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination(); - - ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd = - new ZkOpenRegionCoordination.ZkOpenRegionDetails(); - zkOrd.setVersion(stat.getVersion()); - zkOrd.setServerName(cp.getServer().getServerName()); - - return processRegionsInTransition( - rt, hri, openRegionCoordination, zkOrd); - } finally { - lock.unlock(); - } - } - - /** - * This call is invoked only (1) master assign meta; - * (2) during failover mode startup, zk assignment node processing. - * The locker is set in the caller. It returns true if the region - * is in transition for sure, false otherwise. - * - * It should be private but it is used by some test too. - */ - boolean processRegionsInTransition( - final RegionTransition rt, final HRegionInfo regionInfo, - OpenRegionCoordination coordination, - final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException { - EventType et = rt.getEventType(); - // Get ServerName. Could not be null. - final ServerName sn = rt.getServerName(); - final byte[] regionName = rt.getRegionName(); - final String encodedName = HRegionInfo.encodeRegionName(regionName); - final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName); - LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et); - - if (regionStates.isRegionInTransition(encodedName) - && (regionInfo.isMetaRegion() || !useZKForAssignment)) { - LOG.info("Processed region " + prettyPrintedRegionName + " in state: " - + et + ", does nothing since the region is already in transition " - + regionStates.getRegionTransitionState(encodedName)); - // Just return - return true; - } - if (!serverManager.isServerOnline(sn)) { - // It was transitioning on a dead server, so it's closed now. - // Force to OFFLINE and put it in transition, but not assign it - // since log splitting for the dead server is not done yet. - LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() + - " was on deadserver; forcing offline"); - if (regionStates.isRegionOnline(regionInfo)) { - // Meta could still show the region is assigned to the previous - // server. If that server is online, when we reload the meta, the - // region is put back to online, we need to offline it. - regionStates.regionOffline(regionInfo); - sendRegionClosedNotification(regionInfo); - } - // Put it back in transition so that SSH can re-assign it - regionStates.updateRegionState(regionInfo, State.OFFLINE, sn); - - if (regionInfo.isMetaRegion()) { - // If it's meta region, reset the meta location. - // So that master knows the right meta region server. - MetaTableLocator.setMetaLocation(watcher, sn); - } else { - // No matter the previous server is online or offline, - // we need to reset the last region server of the region. - regionStates.setLastRegionServerOfRegion(sn, encodedName); - // Make sure we know the server is dead. - if (!serverManager.isServerDead(sn)) { - serverManager.expireServer(sn); - } - } - return false; - } - switch (et) { - case M_ZK_REGION_CLOSING: - // Insert into RIT & resend the query to the region server: may be the previous master - // died before sending the query the first time. - final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING); - this.executorService.submit( - new EventHandler(server, EventType.M_MASTER_RECOVERY) { - @Override - public void process() throws IOException { - ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName()); - try { - final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord) - .getVersion(); - unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null); - if (regionStates.isRegionOffline(regionInfo)) { - assign(regionInfo, true); - } - } finally { - lock.unlock(); - } - } - }); - break; - - case RS_ZK_REGION_CLOSED: - case RS_ZK_REGION_FAILED_OPEN: - // Region is closed, insert into RIT and handle it - regionStates.updateRegionState(regionInfo, State.CLOSED, sn); - if (!replicasToClose.contains(regionInfo)) { - invokeAssign(regionInfo); - } else { - offlineDisabledRegion(regionInfo); - } - break; - - case M_ZK_REGION_OFFLINE: - // Insert in RIT and resend to the regionserver - regionStates.updateRegionState(rt, State.PENDING_OPEN); - final RegionState rsOffline = regionStates.getRegionState(regionInfo); - this.executorService.submit( - new EventHandler(server, EventType.M_MASTER_RECOVERY) { - @Override - public void process() throws IOException { - ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName()); - try { - RegionPlan plan = new RegionPlan(regionInfo, null, sn); - addPlan(encodedName, plan); - assign(rsOffline, false, false); - } finally { - lock.unlock(); - } - } - }); - break; - - case RS_ZK_REGION_OPENING: - regionStates.updateRegionState(rt, State.OPENING); - break; - - case RS_ZK_REGION_OPENED: - // Region is opened, insert into RIT and handle it - // This could be done asynchronously, we would need then to acquire the lock in the - // handler. - regionStates.updateRegionState(rt, State.OPEN); - new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process(); - break; - case RS_ZK_REQUEST_REGION_SPLIT: - case RS_ZK_REGION_SPLITTING: - case RS_ZK_REGION_SPLIT: - // Splitting region should be online. We could have skipped it during - // user region rebuilding since we may consider the split is completed. - // Put it in SPLITTING state to avoid complications. - regionStates.regionOnline(regionInfo, sn); - regionStates.updateRegionState(rt, State.SPLITTING); - if (!handleRegionSplitting( - rt, encodedName, prettyPrintedRegionName, sn)) { - deleteSplittingNode(encodedName, sn); - } - break; - case RS_ZK_REQUEST_REGION_MERGE: - case RS_ZK_REGION_MERGING: - case RS_ZK_REGION_MERGED: - if (!handleRegionMerging( - rt, encodedName, prettyPrintedRegionName, sn)) { - deleteMergingNode(encodedName, sn); - } - break; - default: - throw new IllegalStateException("Received region in state:" + et + " is not valid."); - } - LOG.info("Processed region " + prettyPrintedRegionName + " in state " - + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ") - + "server: " + sn); - return true; - } - - /** * When a region is closed, it should be removed from the regionsToReopen * @param hri HRegionInfo of the region which was closed */ @@ -889,247 +557,6 @@ public class AssignmentManager extends ZooKeeperListener { } } - /** - * Handles various states an unassigned node can be in. - * <p> - * Method is called when a state change is suspected for an unassigned node. - * <p> - * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING - * yet). - * @param rt region transition - * @param coordination coordination for opening region - * @param ord details about opening region - */ - void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination, - OpenRegionCoordination.OpenRegionDetails ord) { - if (rt == null) { - LOG.warn("Unexpected NULL input for RegionTransition rt"); - return; - } - final ServerName sn = rt.getServerName(); - // Check if this is a special HBCK transition - if (sn.equals(HBCK_CODE_SERVERNAME)) { - handleHBCK(rt); - return; - } - final long createTime = rt.getCreateTime(); - final byte[] regionName = rt.getRegionName(); - String encodedName = HRegionInfo.encodeRegionName(regionName); - String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName); - // Verify this is a known server - if (!serverManager.isServerOnline(sn) - && !ignoreStatesRSOffline.contains(rt.getEventType())) { - LOG.warn("Attempted to handle region transition for server but " + - "it is not online: " + prettyPrintedRegionName + ", " + rt); - return; - } - - RegionState regionState = - regionStates.getRegionState(encodedName); - long startTime = System.currentTimeMillis(); - if (LOG.isDebugEnabled()) { - boolean lateEvent = createTime < (startTime - 15000); - LOG.debug("Handling " + rt.getEventType() + - ", server=" + sn + ", region=" + - (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) + - (lateEvent ? ", which is more than 15 seconds late" : "") + - ", current_state=" + regionState); - } - // We don't do anything for this event, - // so separate it out, no need to lock/unlock anything - if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) { - return; - } - - // We need a lock on the region as we could update it - Lock lock = locker.acquireLock(encodedName); - try { - RegionState latestState = - regionStates.getRegionState(encodedName); - if ((regionState == null && latestState != null) - || (regionState != null && latestState == null) - || (regionState != null && latestState != null - && latestState.getState() != regionState.getState())) { - LOG.warn("Region state changed from " + regionState + " to " - + latestState + ", while acquiring lock"); - } - long waitedTime = System.currentTimeMillis() - startTime; - if (waitedTime > 5000) { - LOG.warn("Took " + waitedTime + "ms to acquire the lock"); - } - regionState = latestState; - switch (rt.getEventType()) { - case RS_ZK_REQUEST_REGION_SPLIT: - case RS_ZK_REGION_SPLITTING: - case RS_ZK_REGION_SPLIT: - if (!handleRegionSplitting( - rt, encodedName, prettyPrintedRegionName, sn)) { - deleteSplittingNode(encodedName, sn); - } - break; - - case RS_ZK_REQUEST_REGION_MERGE: - case RS_ZK_REGION_MERGING: - case RS_ZK_REGION_MERGED: - // Merged region is a new region, we can't find it in the region states now. - // However, the two merging regions are not new. They should be in state for merging. - if (!handleRegionMerging( - rt, encodedName, prettyPrintedRegionName, sn)) { - deleteMergingNode(encodedName, sn); - } - break; - - case M_ZK_REGION_CLOSING: - // Should see CLOSING after we have asked it to CLOSE or additional - // times after already being in state of CLOSING - if (regionState == null - || !regionState.isPendingCloseOrClosingOnServer(sn)) { - LOG.warn("Received CLOSING for " + prettyPrintedRegionName - + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: " - + regionStates.getRegionState(encodedName)); - return; - } - // Transition to CLOSING (or update stamp if already CLOSING) - regionStates.updateRegionState(rt, State.CLOSING); - break; - - case RS_ZK_REGION_CLOSED: - // Should see CLOSED after CLOSING but possible after PENDING_CLOSE - if (regionState == null - || !regionState.isPendingCloseOrClosingOnServer(sn)) { - LOG.warn("Received CLOSED for " + prettyPrintedRegionName - + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: " - + regionStates.getRegionState(encodedName)); - return; - } - // Handle CLOSED by assigning elsewhere or stopping if a disable - // If we got here all is good. Need to update RegionState -- else - // what follows will fail because not in expected state. - new ClosedRegionHandler(server, this, regionState.getRegion()).process(); - updateClosedRegionHandlerTracker(regionState.getRegion()); - break; - - case RS_ZK_REGION_FAILED_OPEN: - if (regionState == null - || !regionState.isPendingOpenOrOpeningOnServer(sn)) { - LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName - + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: " - + regionStates.getRegionState(encodedName)); - return; - } - AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName); - if (failedOpenCount == null) { - failedOpenCount = new AtomicInteger(); - // No need to use putIfAbsent, or extra synchronization since - // this whole handleRegion block is locked on the encoded region - // name, and failedOpenTracker is updated only in this block - failedOpenTracker.put(encodedName, failedOpenCount); - } - if (failedOpenCount.incrementAndGet() >= maximumAttempts) { - regionStates.updateRegionState(rt, State.FAILED_OPEN); - // remove the tracking info to save memory, also reset - // the count for next open initiative - failedOpenTracker.remove(encodedName); - } else { - // Handle this the same as if it were opened and then closed. - regionState = regionStates.updateRegionState(rt, State.CLOSED); - if (regionState != null) { - // When there are more than one region server a new RS is selected as the - // destination and the same is updated in the regionplan. (HBASE-5546) - try { - getRegionPlan(regionState.getRegion(), sn, true); - new ClosedRegionHandler(server, this, regionState.getRegion()).process(); - } catch (HBaseIOException e) { - LOG.warn("Failed to get region plan", e); - } - } - } - break; - - case RS_ZK_REGION_OPENING: - // Should see OPENING after we have asked it to OPEN or additional - // times after already being in state of OPENING - if (regionState == null - || !regionState.isPendingOpenOrOpeningOnServer(sn)) { - LOG.warn("Received OPENING for " + prettyPrintedRegionName - + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: " - + regionStates.getRegionState(encodedName)); - return; - } - // Transition to OPENING (or update stamp if already OPENING) - regionStates.updateRegionState(rt, State.OPENING); - break; - - case RS_ZK_REGION_OPENED: - // Should see OPENED after OPENING but possible after PENDING_OPEN. - if (regionState == null - || !regionState.isPendingOpenOrOpeningOnServer(sn)) { - LOG.warn("Received OPENED for " + prettyPrintedRegionName - + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: " - + regionStates.getRegionState(encodedName)); - - if (regionState != null) { - // Close it without updating the internal region states, - // so as not to create double assignments in unlucky scenarios - // mentioned in OpenRegionHandler#process - unassign(regionState.getRegion(), null, -1, null, false, sn); - } - return; - } - // Handle OPENED by removing from transition and deleted zk node - regionState = regionStates.updateRegionState(rt, State.OPEN); - if (regionState != null) { - failedOpenTracker.remove(encodedName); // reset the count, if any - new OpenedRegionHandler( - server, this, regionState.getRegion(), coordination, ord).process(); - updateOpenedRegionHandlerTracker(regionState.getRegion()); - } - break; - - default: - throw new IllegalStateException("Received event is not valid."); - } - } finally { - lock.unlock(); - } - } - - //For unit tests only - boolean wasClosedHandlerCalled(HRegionInfo hri) { - AtomicBoolean b = closedRegionHandlerCalled.get(hri); - //compareAndSet to be sure that unit tests don't see stale values. Means, - //we will return true exactly once unless the handler code resets to true - //this value. - return b == null ? false : b.compareAndSet(true, false); - } - - //For unit tests only - boolean wasOpenedHandlerCalled(HRegionInfo hri) { - AtomicBoolean b = openedRegionHandlerCalled.get(hri); - //compareAndSet to be sure that unit tests don't see stale values. Means, - //we will return true exactly once unless the handler code resets to true - //this value. - return b == null ? false : b.compareAndSet(true, false); - } - - //For unit tests only - void initializeHandlerTrackers() { - closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>(); - openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>(); - } - - void updateClosedRegionHandlerTracker(HRegionInfo hri) { - if (closedRegionHandlerCalled != null) { //only for unit tests this is true - closedRegionHandlerCalled.put(hri, new AtomicBoolean(true)); - } - } - - void updateOpenedRegionHandlerTracker(HRegionInfo hri) { - if (openedRegionHandlerCalled != null) { //only for unit tests this is true - openedRegionHandlerCalled.put(hri, new AtomicBoolean(true)); - } - } - // TODO: processFavoredNodes might throw an exception, for e.g., if the // meta could not be contacted/updated. We need to see how seriously to treat // this problem as. Should we fail the current assignment. We should be able @@ -1150,264 +577,6 @@ public class AssignmentManager extends ZooKeeperListener { } /** - * Handle a ZK unassigned node transition triggered by HBCK repair tool. - * <p> - * This is handled in a separate code path because it breaks the normal rules. - * @param rt - */ - @SuppressWarnings("deprecation") - private void handleHBCK(RegionTransition rt) { - String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName()); - LOG.info("Handling HBCK triggered transition=" + rt.getEventType() + - ", server=" + rt.getServerName() + ", region=" + - HRegionInfo.prettyPrint(encodedName)); - RegionState regionState = regionStates.getRegionTransitionState(encodedName); - switch (rt.getEventType()) { - case M_ZK_REGION_OFFLINE: - HRegionInfo regionInfo; - if (regionState != null) { - regionInfo = regionState.getRegion(); - } else { - try { - byte [] name = rt.getRegionName(); - Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion( - this.server.getShortCircuitConnection(), name); - regionInfo = p.getFirst(); - } catch (IOException e) { - LOG.info("Exception reading hbase:meta doing HBCK repair operation", e); - return; - } - } - LOG.info("HBCK repair is triggering assignment of region=" + - regionInfo.getRegionNameAsString()); - // trigger assign, node is already in OFFLINE so don't need to update ZK - assign(regionInfo, false); - break; - - default: - LOG.warn("Received unexpected region state from HBCK: " + rt.toString()); - break; - } - - } - - // ZooKeeper events - - /** - * New unassigned node has been created. - * - * <p>This happens when an RS begins the OPENING or CLOSING of a region by - * creating an unassigned node. - * - * <p>When this happens we must: - * <ol> - * <li>Watch the node for further events</li> - * <li>Read and handle the state in the node</li> - * </ol> - */ - @Override - public void nodeCreated(String path) { - handleAssignmentEvent(path); - } - - /** - * Existing unassigned node has had data changed. - * - * <p>This happens when an RS transitions from OFFLINE to OPENING, or between - * OPENING/OPENED and CLOSING/CLOSED. - * - * <p>When this happens we must: - * <ol> - * <li>Watch the node for further events</li> - * <li>Read and handle the state in the node</li> - * </ol> - */ - @Override - public void nodeDataChanged(String path) { - handleAssignmentEvent(path); - } - - - // We don't want to have two events on the same region managed simultaneously. - // For this reason, we need to wait if an event on the same region is currently in progress. - // So we track the region names of the events in progress, and we keep a waiting list. - private final Set<String> regionsInProgress = new HashSet<String>(); - // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need - // this as we want the events to be managed in the same order as we received them. - private final LinkedHashMultimap <String, RegionRunnable> - zkEventWorkerWaitingList = LinkedHashMultimap.create(); - - /** - * A specific runnable that works only on a region. - */ - private interface RegionRunnable extends Runnable{ - /** - * @return - the name of the region it works on. - */ - String getRegionName(); - } - - /** - * Submit a task, ensuring that there is only one task at a time that working on a given region. - * Order is respected. - */ - protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) { - - synchronized (regionsInProgress) { - // If we're there is already a task with this region, we add it to the - // waiting list and return. - if (regionsInProgress.contains(regRunnable.getRegionName())) { - synchronized (zkEventWorkerWaitingList){ - zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable); - } - return; - } - - // No event in progress on this region => we can submit a new task immediately. - regionsInProgress.add(regRunnable.getRegionName()); - zkEventWorkers.submit(new Runnable() { - @Override - public void run() { - try { - regRunnable.run(); - } finally { - // now that we have finished, let's see if there is an event for the same region in the - // waiting list. If it's the case, we can now submit it to the pool. - synchronized (regionsInProgress) { - regionsInProgress.remove(regRunnable.getRegionName()); - synchronized (zkEventWorkerWaitingList) { - java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get( - regRunnable.getRegionName()); - if (!waiting.isEmpty()) { - // We want the first object only. The only way to get it is through an iterator. - RegionRunnable toSubmit = waiting.iterator().next(); - zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit); - zkEventWorkersSubmit(toSubmit); - } - } - } - } - } - }); - } - } - - @Override - public void nodeDeleted(final String path) { - if (path.startsWith(watcher.assignmentZNode)) { - final String regionName = ZKAssign.getRegionName(watcher, path); - zkEventWorkersSubmit(new RegionRunnable() { - @Override - public String getRegionName() { - return regionName; - } - - @Override - public void run() { - Lock lock = locker.acquireLock(regionName); - try { - RegionState rs = regionStates.getRegionTransitionState(regionName); - if (rs == null) { - rs = regionStates.getRegionState(regionName); - if (rs == null || !rs.isMergingNew()) { - // MergingNew is an offline state - return; - } - } - - HRegionInfo regionInfo = rs.getRegion(); - String regionNameStr = regionInfo.getRegionNameAsString(); - LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs); - - boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(), - ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING); - - ServerName serverName = rs.getServerName(); - if (serverManager.isServerOnline(serverName)) { - if (rs.isOnServer(serverName) - && (rs.isOpened() || rs.isSplitting())) { - regionOnline(regionInfo, serverName); - if (disabled) { - // if server is offline, no hurt to unassign again - LOG.info("Opened " + regionNameStr - + "but this table is disabled, triggering close of region"); - unassign(regionInfo); - } - } else if (rs.isMergingNew()) { - synchronized (regionStates) { - String p = regionInfo.getEncodedName(); - PairOfSameType<HRegionInfo> regions = mergingRegions.get(p); - if (regions != null) { - onlineMergingRegion(disabled, regions.getFirst(), serverName); - onlineMergingRegion(disabled, regions.getSecond(), serverName); - } - } - } - } - } finally { - lock.unlock(); - } - } - - private void onlineMergingRegion(boolean disabled, - final HRegionInfo hri, final ServerName serverName) { - RegionState regionState = regionStates.getRegionState(hri); - if (regionState != null && regionState.isMerging() - && regionState.isOnServer(serverName)) { - regionOnline(regionState.getRegion(), serverName); - if (disabled) { - unassign(hri); - } - } - } - }); - } - } - - /** - * New unassigned node has been created. - * - * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a - * region by creating a znode. - * - * <p>When this happens we must: - * <ol> - * <li>Watch the node for further children changed events</li> - * <li>Watch all new children for changed events</li> - * </ol> - */ - @Override - public void nodeChildrenChanged(String path) { - if (path.equals(watcher.assignmentZNode)) { - zkEventWorkers.submit(new Runnable() { - @Override - public void run() { - try { - // Just make sure we see the changes for the new znodes - List<String> children = - ZKUtil.listChildrenAndWatchForNewChildren( - watcher, watcher.assignmentZNode); - if (children != null) { - Stat stat = new Stat(); - for (String child : children) { - // if region is in transition, we already have a watch - // on it, so no need to watch it again. So, as I know for now, - // this is needed to watch splitting nodes only. - if (!regionStates.isRegionInTransition(child)) { - ZKAssign.getDataAndWatch(watcher, child, stat); - } - } - } - } catch (KeeperException e) { - server.abort("Unexpected ZK exception reading unassigned children", e); - } - } - }); - } - } - - - /** * Marks the region as online. Removes it from regions in transition and * updates the in-memory assignment information. * <p> @@ -1432,55 +601,6 @@ public class AssignmentManager extends ZooKeeperListener { } /** - * Pass the assignment event to a worker for processing. - * Each worker is a single thread executor service. The reason - * for just one thread is to make sure all events for a given - * region are processed in order. - * - * @param path - */ - private void handleAssignmentEvent(final String path) { - if (path.startsWith(watcher.assignmentZNode)) { - final String regionName = ZKAssign.getRegionName(watcher, path); - - zkEventWorkersSubmit(new RegionRunnable() { - @Override - public String getRegionName() { - return regionName; - } - - @Override - public void run() { - try { - Stat stat = new Stat(); - byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat); - if (data == null) return; - - RegionTransition rt = RegionTransition.parseFrom(data); - - // TODO: This code is tied to ZK anyway, so for now leaving it as is, - // will refactor when whole region assignment will be abstracted from ZK - BaseCoordinatedStateManager csm = - (BaseCoordinatedStateManager) server.getCoordinatedStateManager(); - OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination(); - - ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd = - new ZkOpenRegionCoordination.ZkOpenRegionDetails(); - zkOrd.setVersion(stat.getVersion()); - zkOrd.setServerName(csm.getServer().getServerName()); - - handleRegion(rt, openRegionCoordination, zkOrd); - } catch (KeeperException e) { - server.abort("Unexpected ZK exception reading unassigned node data", e); - } catch (DeserializationException e) { - server.abort("Unexpected exception deserializing node data", e); - } - } - }); - } - } - - /** * Marks the region as offline. Removes it from regions in transition and * removes in-memory assignment information. * <p> @@ -1492,15 +612,6 @@ public class AssignmentManager extends ZooKeeperListener { } public void offlineDisabledRegion(HRegionInfo regionInfo) { - if (useZKForAssignment) { - // Disabling so should not be reassigned, just delete the CLOSED node - LOG.debug("Table being disabled so deleting ZK node and removing from " + - "regions in transition, skipping assignment of region " + - regionInfo.getRegionNameAsString()); - String encodedName = regionInfo.getEncodedName(); - deleteNodeInStates(encodedName, "closed", null, - EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE); - } replicasToClose.remove(regionInfo); regionOffline(regionInfo); } @@ -1517,23 +628,19 @@ public class AssignmentManager extends ZooKeeperListener { * Updates the RegionState and sends the OPEN RPC. * <p> * This will only succeed if the region is in transition and in a CLOSED or - * OFFLINE state or not in transition (in-memory not zk), and of course, the - * chosen server is up and running (It may have just crashed!). If the - * in-memory checks pass, the zk node is forced to OFFLINE before assigning. + * OFFLINE state or not in transition, and of course, the + * chosen server is up and running (It may have just crashed!). * * @param region server to be assigned - * @param setOfflineInZK whether ZK node should be created/transitioned to an - * OFFLINE state before assigning the region */ - public void assign(HRegionInfo region, boolean setOfflineInZK) { - assign(region, setOfflineInZK, false); + public void assign(HRegionInfo region) { + assign(region, false); } /** * Use care with forceNewPlan. It could cause double assignment. */ - public void assign(HRegionInfo region, - boolean setOfflineInZK, boolean forceNewPlan) { + public void assign(HRegionInfo region, boolean forceNewPlan) { if (isDisabledorDisablingRegionInRIT(region)) { return; } @@ -1553,7 +660,7 @@ public class AssignmentManager extends ZooKeeperListener { + " is dead but not processed yet"); return; } - assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan); + assign(state, forceNewPlan); } } finally { lock.unlock(); @@ -1583,12 +690,8 @@ public class AssignmentManager extends ZooKeeperListener { List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>(); Map<String, Lock> locks = locker.acquireLocks(encodedNames); try { - AtomicInteger counter = new AtomicInteger(0); - Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>(); - OfflineCallback cb = new OfflineCallback( - watcher, destination, counter, offlineNodesVersions); - Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size()); - List<RegionState> states = new ArrayList<RegionState>(regions.size()); + Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regionCount); + List<RegionState> states = new ArrayList<RegionState>(regionCount); for (HRegionInfo region : regions) { String encodedName = region.getEncodedName(); if (!isDisabledorDisablingRegionInRIT(region)) { @@ -1600,8 +703,7 @@ public class AssignmentManager extends ZooKeeperListener { + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName) + " is dead but not processed yet"); onDeadServer = true; - } else if (!useZKForAssignment - || asyncSetOfflineInZooKeeper(state, cb, destination)) { + } else { RegionPlan plan = new RegionPlan(region, state.getServerName(), destination); plans.put(encodedName, plan); states.add(state); @@ -1610,8 +712,8 @@ public class AssignmentManager extends ZooKeeperListener { } // Reassign if the region wasn't on a dead server if (!onDeadServer) { - LOG.info("failed to force region state to offline or " - + "failed to set it offline in ZK, will reassign later: " + region); + LOG.info("failed to force region state to offline, " + + "will reassign later: " + region); failedToOpenRegions.add(region); // assign individually later } } @@ -1621,21 +723,6 @@ public class AssignmentManager extends ZooKeeperListener { lock.unlock(); } - if (useZKForAssignment) { - // Wait until all unassigned nodes have been put up and watchers set. - int total = states.size(); - for (int oldCounter = 0; !server.isStopped();) { - int count = counter.get(); - if (oldCounter != count) { - LOG.debug(destination.toString() + " unassigned znodes=" + count + - " of total=" + total + "; oldCounter=" + oldCounter); - oldCounter = count; - } - if (count >= total) break; - Thread.sleep(5); - } - } - if (server.isStopped()) { return false; } @@ -1644,27 +731,18 @@ public class AssignmentManager extends ZooKeeperListener { // that unnecessary timeout on RIT is reduced. this.addPlans(plans); - List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos = - new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size()); + List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos = + new ArrayList<Pair<HRegionInfo, List<ServerName>>>(states.size()); for (RegionState state: states) { HRegionInfo region = state.getRegion(); - String encodedRegionName = region.getEncodedName(); - Integer nodeVersion = offlineNodesVersions.get(encodedRegionName); - if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) { - LOG.warn("failed to offline in zookeeper: " + region); - failedToOpenRegions.add(region); // assign individually later - Lock lock = locks.remove(encodedRegionName); - lock.unlock(); - } else { - regionStates.updateRegionState( - region, State.PENDING_OPEN, destination); - List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST; - if (this.shouldAssignRegionsWithFavoredNodes) { - favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region); - } - regionOpenInfos.add(new Triple<HRegionInfo, Integer, List<ServerName>>( - region, nodeVersion, favoredNodes)); + regionStates.updateRegionState( + region, State.PENDING_OPEN, destination); + List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST; + if (this.shouldAssignRegionsWithFavoredNodes) { + favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region); } + regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>( + region, favoredNodes)); } // Move on to open regions. @@ -1686,15 +764,8 @@ public class AssignmentManager extends ZooKeeperListener { RegionOpeningState openingState = regionOpeningStateList.get(k); if (openingState != RegionOpeningState.OPENED) { HRegionInfo region = regionOpenInfos.get(k).getFirst(); - if (openingState == RegionOpeningState.ALREADY_OPENED) { - processAlreadyOpenedRegion(region, destination); - } else if (openingState == RegionOpeningState.FAILED_OPENING) { - // Failed opening this region, reassign it later - failedToOpenRegions.add(region); - } else { - LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state " - + openingState + " in assigning region " + region); - } + // Failed opening this region, reassign it later + failedToOpenRegions.add(region); } } break; @@ -1771,8 +842,7 @@ public class AssignmentManager extends ZooKeeperListener { * on an unexpected server scenario, for an example) */ private void unassign(final HRegionInfo region, - final RegionState state, final int versionOfClosingNode, - final ServerName dest, final boolean transitionInZK, + final RegionState state, final ServerName dest, final ServerName src) { ServerName server = src; if (state != null) { @@ -1788,10 +858,6 @@ public class AssignmentManager extends ZooKeeperListener { if (!serverManager.isServerOnline(server)) { LOG.debug("Offline " + region.getRegionNameAsString() + ", no need to unassign since it's on a dead server: " + server); - if (transitionInZK) { - // delete the node. if no node exists need not bother. - deleteClosingOrClosedNode(region, server); - } if (state != null) { regionOffline(region); } @@ -1799,16 +865,9 @@ public class AssignmentManager extends ZooKeeperListener { } try { // Send CLOSE RPC - if (serverManager.sendRegionClose(server, region, - versionOfClosingNode, dest, transitionInZK)) { + if (serverManager.sendRegionClose(server, region, dest)) { LOG.debug("Sent CLOSE to " + server + " for region " + region.getRegionNameAsString()); - if (useZKForAssignment && !transitionInZK && state != null) { - // Retry to make sure the region is - // closed so as to avoid double assignment. - unassign(region, state, versionOfClosingNode, - dest, transitionInZK, src); - } return; } // This never happens. Currently regionserver close always return true. @@ -1825,9 +884,6 @@ public class AssignmentManager extends ZooKeeperListener { || t instanceof ServerNotRunningYetException) { LOG.debug("Offline " + region.getRegionNameAsString() + ", it's not any more on " + server, t); - if (transitionInZK) { - deleteClosingOrClosedNode(region, server); - } if (state != null) { regionOffline(region); } @@ -1840,9 +896,6 @@ public class AssignmentManager extends ZooKeeperListener { sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); } else { - // RS is already processing this region, only need to update the timestamp - LOG.debug("update " + state + " the timestamp."); - state.updateTimestampToNow(); if (maxWaitTime < 0) { maxWaitTime = EnvironmentEdgeManager.currentTimeMillis() @@ -1898,7 +951,6 @@ public class AssignmentManager extends ZooKeeperListener { state = regionStates.createRegionState(region); } - ServerName sn = state.getServerName(); if (forceNewPlan && LOG.isDebugEnabled()) { LOG.debug("Force region state offline " + state); } @@ -1916,7 +968,7 @@ public class AssignmentManager extends ZooKeeperListener { } case FAILED_CLOSE: case FAILED_OPEN: - unassign(region, state, -1, null, false, null); + unassign(region, state, null, null); state = regionStates.getRegionState(region); if (state.isFailedClose()) { // If we can't close the region, we can't re-assign @@ -1926,21 +978,6 @@ public class AssignmentManager extends ZooKeeperListener { return null; } case OFFLINE: - // This region could have been open on this server - // for a while. If the server is dead and not processed - // yet, we can move on only if the meta shows the - // region is not on this server actually, or on a server - // not dead, or dead and processed already. - // In case not using ZK, we don't need this check because - // we have the latest info in memory, and the caller - // will do another round checking any way. - if (useZKForAssignment - && regionStates.isServerDeadAndNotProcessed(sn) - && wasRegionOnDeadServerByMeta(region, sn)) { - LOG.info("Skip assigning " + region.getRegionNameAsString() - + ", it is on a dead but not processed yet server: " + sn); - return null; - } case CLOSED: break; default: @@ -1951,49 +988,15 @@ public class AssignmentManager extends ZooKeeperListener { return state; } - @SuppressWarnings("deprecation") - private boolean wasRegionOnDeadServerByMeta( - final HRegionInfo region, final ServerName sn) { - try { - if (region.isMetaRegion()) { - ServerName server = this.server.getMetaTableLocator(). - getMetaRegionLocation(this.server.getZooKeeper()); - return regionStates.isServerDeadAndNotProcessed(server); - } - while (!server.isStopped()) { - try { - this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper()); - Result r = MetaTableAccessor.getRegionResult(server.getShortCircuitConnection(), - region.getRegionName()); - if (r == null || r.isEmpty()) return false; - ServerName server = HRegionInfo.getServerName(r); - return regionStates.isServerDeadAndNotProcessed(server); - } catch (IOException ioe) { - LOG.info("Received exception accessing hbase:meta during force assign " - + region.getRegionNameAsString() + ", retrying", ioe); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.info("Interrupted accessing hbase:meta", e); - } - // Call is interrupted or server is stopped. - return regionStates.isServerDeadAndNotProcessed(sn); - } - /** * Caller must hold lock on the passed <code>state</code> object. * @param state - * @param setOfflineInZK * @param forceNewPlan */ - private void assign(RegionState state, - final boolean setOfflineInZK, final boolean forceNewPlan) { + private void assign(RegionState state, boolean forceNewPlan) { long startTime = EnvironmentEdgeManager.currentTimeMillis(); try { Configuration conf = server.getConfiguration(); - RegionState currentState = state; - int versionOfOfflineNode = -1; RegionPlan plan = null; long maxWaitTime = -1; HRegionInfo region = state.getRegion(); @@ -2027,14 +1030,6 @@ public class AssignmentManager extends ZooKeeperListener { regionStates.updateRegionState(region, State.FAILED_OPEN); return; } - if (setOfflineInZK && versionOfOfflineNode == -1) { - // get the version of the znode after setting it to OFFLINE. - // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE - versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination()); - if (versionOfOfflineNode != -1) { - if (isDisabledorDisablingRegionInRIT(region)) { - return; - } // In case of assignment from EnableTableHandler table state is ENABLING. Any how // EnableTableHandler will set ENABLED after assigning all the table regions. If we // try to set to ENABLED directly then client API may think table is enabled. @@ -2047,22 +1042,10 @@ public class AssignmentManager extends ZooKeeperListener { LOG.debug("Setting table " + tableName + " to ENABLED state."); setEnabledTable(tableName); } - } - } - if (setOfflineInZK && versionOfOfflineNode == -1) { - LOG.info("Unable to set offline in ZooKeeper to assign " + region); - // Setting offline in ZK must have been failed due to ZK racing or some - // exception which may make the server to abort. If it is ZK racing, - // we should retry since we already reset the region state, - // existing (re)assignment will fail anyway. - if (!server.isAborted()) { - continue; - } - } LOG.info("Assigning " + region.getRegionNameAsString() + " to " + plan.getDestination().toString()); // Transition RegionState to PENDING_OPEN - currentState = regionStates.updateRegionState(region, + regionStates.updateRegionState(region, State.PENDING_OPEN, plan.getDestination()); boolean needNewPlan; @@ -2074,7 +1057,7 @@ public class AssignmentManager extends ZooKeeperListener { favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region); } regionOpenState = serverManager.sendRegionOpen( - plan.getDestination(), region, versionOfOfflineNode, favoredNodes); + plan.getDestination(), region, favoredNodes); if (regionOpenState == RegionOpeningState.FAILED_OPENING) { // Failed opening this region, looping again on a new server. @@ -2084,9 +1067,6 @@ public class AssignmentManager extends ZooKeeperListener { "try=" + i + " of " + this.maximumAttempts); } else { // we're done - if (regionOpenState == RegionOpeningState.ALREADY_OPENED) { - processAlreadyOpenedRegion(region, plan.getDestination()); - } return; } @@ -2186,8 +1166,7 @@ public class AssignmentManager extends ZooKeeperListener { // Clean out plan we failed execute and one that doesn't look like it'll // succeed anyways; we need a new plan! // Transition back to OFFLINE - currentState = regionStates.updateRegionState(region, State.OFFLINE); - versionOfOfflineNode = -1; + regionStates.updateRegionState(region, State.OFFLINE); plan = newPlan; } else if(plan.getDestination().equals(newPlan.getDestination()) && previousException instanceof FailedServerException) { @@ -2213,17 +1192,6 @@ public class AssignmentManager extends ZooKeeperListener { } } - private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) { - // Remove region from in-memory transition and unassigned node from ZK - // While trying to enable the table the regions of the table were - // already enabled. - LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString() - + " to " + sn); - String encodedName = region.getEncodedName(); - deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE); - regionStates.regionOnline(region, sn); - } - private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) { if (this.tableStateManager.isTableState(region.getTable(), ZooKeeperProtos.Table.State.DISABLED, @@ -2237,37 +1205,6 @@ public class AssignmentManager extends ZooKeeperListener { } /** - * Set region as OFFLINED up in zookeeper - * - * @param state - * @return the version of the offline node if setting of the OFFLINE node was - * successful, -1 otherwise. - */ - private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) { - if (!state.isClosed() && !state.isOffline()) { - String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE."; - this.server.abort(msg, new IllegalStateException(msg)); - return -1; - } - regionStates.updateRegionState(state.getRegion(), State.OFFLINE); - int versionOfOfflineNode; - try { - // get the version after setting the znode to OFFLINE - versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher, - state.getRegion(), destination); - if (versionOfOfflineNode == -1) { - LOG.warn("Attempted to create/force node into OFFLINE state before " - + "completing assignment but failed to do so for " + state); - return -1; - } - } catch (KeeperException e) { - server.abort("Unexpected ZK exception creating/setting node OFFLINE", e); - return -1; - } - return versionOfOfflineNode; - } - - /** * @param region the region to assign * @return Plan for passed <code>region</code> (If none currently, it creates one or * if no servers to assign, it returns null). @@ -2388,7 +1325,6 @@ public class AssignmentManager extends ZooKeeperListener { String encodedName = region.getEncodedName(); // Grab the state of this region and synchronize on it - int versionOfClosingNode = -1; // We need a lock here as we're going to do a put later and we don't want multiple states // creation ReentrantLock lock = locker.acquireLock(encodedName); @@ -2404,56 +1340,12 @@ public class AssignmentManager extends ZooKeeperListener { // Offline region will be reassigned below return; } - // Create the znode in CLOSING state - try { - if (state == null || state.getServerName() == null) { - // We don't know where the region is, offline it. - // No need to send CLOSE RPC - LOG.warn("Attempting to unassign a region not in RegionStates" - + region.getRegionNameAsString() + ", offlined"); - regionOffline(region); - return; - } - if (useZKForAssignment) { - versionOfClosingNode = ZKAssign.createNodeClosing( - watcher, region, state.getServerName()); - if (versionOfClosingNode == -1) { - LOG.info("Attempting to unassign " + - region.getRegionNameAsString() + " but ZK closing node " - + "can't be created."); - reassign = false; // not unassigned at all - return; - } - } - } catch (KeeperException e) { - if (e instanceof NodeExistsException) { - // Handle race between master initiated close and regionserver - // orchestrated splitting. See if existing node is in a - // SPLITTING or SPLIT state. If so, the regionserver started - // an op on node before we could get our CLOSING in. Deal. - NodeExistsException nee = (NodeExistsException)e; - String path = nee.getPath(); - try { - if (isSplitOrSplittingOrMergedOrMerging(path)) { - LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " + - "skipping unassign because region no longer exists -- its split or merge"); - reassign = false; // no need to reassign for split/merged region - return; - } - } catch (KeeperException.NoNodeException ke) { - LOG.warn("Failed getData on SPLITTING/SPLIT at " + path + - "; presuming split and that the region to unassign, " + - encodedName + ", no longer exists -- confirm", ke); - return; - } catch (KeeperException ke) { - LOG.error("Unexpected zk state", ke); - } catch (DeserializationException de) { - LOG.error("Failed parse", de); - } - } - // If we get here, don't understand whats going on -- abort. - server.abort("Unexpected ZK exception creating node CLOSING", e); - reassign = false; // heading out already + if (state == null || state.getServerName() == null) { + // We don't know where the region is, offline it. + // No need to send CLOSE RPC + LOG.warn("Attempting to unassign a region not in RegionStates" + + region.getRegionNameAsString() + ", offlined"); + regionOffline(region); return; } state = regionStates.updateRegionState(region, State.PENDING_CLOSE); @@ -2468,7 +1360,6 @@ public class AssignmentManager extends ZooKeeperListener { if (state.isFailedClose()) { state = regionStates.updateRegionState(region, State.PENDING_CLOSE); } - state.updateTimestampToNow(); } else { LOG.debug("Attempting to unassign " + region.getRegionNameAsString() + " but it is " + @@ -2476,13 +1367,13 @@ public class AssignmentManager extends ZooKeeperListener { return; } - unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null); + unassign(region, state, dest, null); } finally { lock.unlock(); // Region is expected to be reassigned afterwards if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) { - assign(region, true); + assign(region); } } } @@ -2492,48 +1383,6 @@ public class AssignmentManager extends ZooKeeperListener { } /** - * @param region regioninfo of znode to be deleted. - */ - public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) { - String encodedName = region.getEncodedName(); - deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING, - EventType.RS_ZK_REGION_CLOSED); - } - - /** - * @param path - * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state. - * @throws KeeperException Can happen if the znode went away in meantime. - * @throws DeserializationException - */ - private boolean isSplitOrSplittingOrMergedOrMerging(final String path) - throws KeeperException, DeserializationException { - boolean result = false; - // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets - // cleaned up before we can get data from it. - byte [] data = ZKAssign.getData(watcher, path); - if (data == null) { - LOG.info("Node " + path + " is gone"); - return false; - } - RegionTransition rt = RegionTransition.parseFrom(data); - switch (rt.getEventType()) { - case RS_ZK_REQUEST_REGION_SPLIT: - case RS_ZK_REGION_SPLIT: - case RS_ZK_REGION_SPLITTING: - case RS_ZK_REQUEST_REGION_MERGE: - case RS_ZK_REGION_MERGED: - case RS_ZK_REGION_MERGING: - result = true; - break; - default: - LOG.info("Node " + path + " is in " + rt.getEventType()); - break; - } - return result; - } - - /** * Used by unit tests. Return the number of regions opened so far in the life * of the master. Increases by one every time the master opens a region * @return the counter value of the number of regions opened so far @@ -2577,8 +1426,8 @@ public class AssignmentManager extends ZooKeeperListener { * @throws KeeperException */ public void assignMeta() throws KeeperException { - this.server.getMetaTableLocator().deleteMetaLocation(this.watcher); - assign(HRegionInfo.FIRST_META_REGIONINFO, true); + this.server.getMetaTableLocator().deleteMetaLocation(this.server.getZooKeeper()); + assign(HRegionInfo.FIRST_META_REGIONINFO); } /** @@ -2735,30 +1584,6 @@ public class AssignmentManager extends ZooKeeperListener { } /** - * Wait until no regions in transition. - * @param timeout How long to wait. - * @return True if nothing in regions in transition. - * @throws InterruptedException - */ - boolean waitUntilNoRegionsInTransition(final long timeout) - throws InterruptedException { - // Blocks until there are no regions in transition. It is possible that - // there - // are regions in transition immediately after this returns but guarantees - // that if it returns without an exception that there was a period of time - // with no regions in transition from the point-of-view of the in-memory - // state of the Master. - final long endTime = System.currentTimeMillis() + timeout; - - while (!this.server.isStopped() && regionStates.isRegionsInTransition() - && endTime > System.currentTimeMillis()) { - regionStates.waitForUpdate(100); - } - - return !regionStates.isRegionsInTransition(); - } - - /** * Rebuild the list of user regions and assignment information. * <p> * Returns a set of servers that are not found to be online that hosted @@ -2829,16 +1654,11 @@ public class AssignmentManager extends ZooKeeperListener { if (!onlineServers.contains(regionLocation)) { // Region is located on a server that isn't online offlineServers.add(regionLocation); - if (useZKForAssignment) { - regionStates.regionOffline(regionInfo); - } } else if (!disabledOrEnablingTables.contains(tableName)) { // Region is being served and on an active server // add only if region not in disabled or enabling table regionStates.regionOnline(regionInfo, regionLocation); balancer.regionOnline(regionInfo, regionLocation); - } else if (useZKForAssignment) { - regionStates.regionOffline(regionInfo); } // need to enable the table if not disabled or disabling or enabling // this will be used in rolling restarts @@ -2911,21 +1731,12 @@ public class AssignmentManager extends ZooKeeperListener { /** * Processes list of dead servers from result of hbase:meta scan and regions in RIT - * <p> - * This is used for failover to recover the lost regions that belonged to - * RegionServers which failed while there was no active master or regions - * that were in RIT. - * <p> - * * * @param deadServers * The list of dead servers which failed while there was no active * master. Can be null. - * @throws IOException - * @throws KeeperException */ - private void processDeadServersAndRecoverLostRegions( - Set<ServerName> deadServers) throws IOException, KeeperException { + private void processDeadServers(Set<ServerName> deadServers) { if (deadServers != null && !deadServers.isEmpty()) { for (ServerName serverName: deadServers) { if (!serverManager.isServerDead(serverName)) { @@ -2934,36 +1745,27 @@ public class AssignmentManager extends ZooKeeperListener { } } - List<String> nodes = useZKForAssignment ? - ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode) - : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode); - if (nodes != null && !nodes.isEmpty()) { - for (String encodedRegionName : nodes) { - processRegionInTransition(encodedRegionName, null); + // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions + // in case the RPC call is not sent out yet before the master was shut down + // since we update the state before we send the RPC call. We can't update + // the state after the RPC call. Otherwise, we don't know what's happened + // to the region if the master dies right after the RPC call is out. + Map<String, RegionState> rits = regionStates.getRegionsInTransition(); + for (RegionState regionState: rits.values()) { + if (!serverManager.isServerOnline(regionState.getServerName())) { + continue; // SSH will handle it } - } else if (!useZKForAssignment) { - // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions - // in case the RPC call is not sent out yet before the master was shut down - // since we update the state before we send the RPC call. We can't update - // the state after the RPC call. Otherwise, we don't know what's happened - // to the region if the master dies right after the RPC call is out. - Map<String, RegionState> rits = regionStates.getRegionsInTransition(); - for (RegionState regionState: rits.values()) { - if (!serverManager.isServerOnline(regionState.getServerName())) { - continue; // SSH will handle it - } - State state = regionState.getState(); - LOG.info("Processing " + regionState); - switch (state) { - case PENDING_OPEN: - retrySendRegionOpen(regionState); - break; - case PENDING_CLOSE: - retrySendRegionClose(regionState); - break; - default: - // No process for other states - } + State state = regionState.getState(); + LOG.info("Processing " + regionState); + switch (state) { + case PENDING_OPEN: + retrySendRegionOpen(regionState); + break; + case PENDING_CLOSE: + retrySendRegionClose(regionState); + break; + default: + // No process for other states } } } @@ -2992,7 +1794,7 @@ public class AssignmentManager extends ZooKeeperListener { favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri); } RegionOpeningState regionOpenState = serverManager.sendRegionOpen( - serverName, hri, -1, favoredNodes); + serverName, hri, favoredNodes); if (regionOpenState == RegionOpeningState.FAILED_OPENING) { // Failed opening this region, this means the target server didn't get @@ -3045,7 +1847,7 @@ public class AssignmentManager extends ZooKeeperListener { while (serverManager.isServerOnline(serverName) && !server.isStopped() && !server.isAborted()) { try { - if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) { + if (!serverManager.sendRegionClose(serverName, hri, null)) { // This means the region is still on the target server LOG.debug("Got false in retry sendRegionClose for " + regionState + ", re-close it"); @@ -3179,43 +1981,26 @@ public class AssignmentManager extends ZooKeeperListener { /** * Check if the shutdown server carries the specific region. - * We have a bunch of places that store region location - * Those values aren't consistent. There is a delay of notification. - * The location from zookeeper unassigned node has the most recent data; - * but the node could be deleted after the region is opened by AM. - * The AM's info could be old when OpenedRegionHandler - * processing hasn't finished yet when server shutdown occurs. * @return whether the serverName currently hosts the region */ private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) { - RegionTransition rt = null; - try { - byte [] data = ZKAssign.getData(watcher, hri.getEncodedName()); - // This call can legitimately come by null - rt = data == null? null: RegionTransition.parseFrom(data); - } catch (KeeperException e) { - server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e); - } catch (DeserializationException e) { - server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e); - } - - ServerName addressFromZK = rt != null? rt.getServerName(): null; - if (addressFromZK != null) { - // if we get something from ZK, we will use the data - boolean matchZK = addressFromZK.equals(serverName); - LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK + - " current=" + serverName + ", matches=" + matchZK); - return matchZK; + RegionState regionState = regionStates.getRegionTransitionState(hri); + ServerName transitionAddr = regionState != null? regionState.getServerName(): null; + if (transitionAddr != null) { + boolean matchTransitionAddr = transitionAddr.equals(serverName); + LOG.debug("Checking region=" + hri.getRegionNameAsString() + + ", transitioning on server=" + matchTransitionAddr + + " server being checked: " + serverName + + ", matches=" + matchTransitionAddr); + return matchTransitionAddr; } - ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri); - boolean matchAM = (addressFromAM != null && - addressFromAM.equals(serverName)); - LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() + - " is on server=" + (addressFromAM != null ? addressFromAM : "null") + - " server being checked: " + serverName); - - return matchAM; + ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri); + boolean matchAssignedAddr = serverName.equals(assignedAddr); + LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() + + " is on server=" + assignedAddr + ", server being checked: " + + serverName); + return matchAssignedAddr; } /** @@ -3237,7 +2022,7 @@ public class AssignmentManager extends ZooKeeperListener { } } } - List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn); + List<HRegionInfo> regions = regionStates.serverOffline(sn); for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) { HRegionInfo hri = it.next(); String encodedName = hri.getEncodedName(); @@ -3255,12 +2040,6 @@ public class AssignmentManager extends ZooKeeperListener { + " on the dead server any more: " + sn); it.remove(); } else { - try { - // Delete the ZNode if exists - ZKAssign.deleteNodeFailSilent(watcher, hri); - } catch (KeeperException ke) { - server.abort("Unexpected ZK exception deleting node " + hri, ke); - } if (tableStateManager.isTableState(hri.getTable(), ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { regionStates.regionOffline(hri); @@ -3317,12 +2096,7 @@ public class AssignmentManager extends ZooKeeperListener { * Shutdown the threadpool executor service */ public void shutdown() { - // It's an immediate shutdown, so we're clearing the remaining tasks. - synchronized (zkEventWorkerWaitingList){ - zkEventWorkerWaitingList.clear(); - } threadPoolExecutorService.shutdownNow(); - zkEventWorkers.shutdownNow(); regionStateStore.stop(); } @@ -3339,65 +2113,6 @@ public class AssignmentManager extends ZooKeeperListener { } } - /** - * Set region as OFFLINED up in zookeeper asynchronously. - * @param state - * @return True if we succeeded, false otherwise (State was incorrect or failed - * updating zk). - */ - private boolean asyncSetOfflineInZooKeeper(final RegionState state, - final AsyncCallback.StringCallback cb, final ServerName destination) { - if (!state.isClosed() && !state.isOffline()) { - this.server.abort("Unexpected state trying to OFFLINE; " + state, - new IllegalStateException()); - return false; - } - regionStates.updateRegionState(state.getRegion(), State.OFFLINE); - try { - ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(), - destination, cb, state); - } catch (KeeperException e) { - if (e instanceof NodeExistsException) { - LOG.warn("Node for " + state.getRegion() + " already exists"); - } else { - server.abort("Unexpected ZK exception creating/setting node OFFLINE", e); - } - return false; - } - return true; - } - - private boolean deleteNodeInStates(String encodedName, - String desc, ServerName sn, EventType... types) { - try { - for (EventType et: types) { - if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) { - return true; - } - } - LOG.info("Failed to delete the " + desc + " node for " - + encodedName + ". The node type may not match"); - } catch (NoNodeException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("The " + desc + " node for " + encodedName + " already deleted"); - } - } catch (KeeperException ke) { - server.abort("Unexpected ZK exception deleting " + desc - + " node for the region " + encodedName, ke); - } - return false; - } - - private void deleteMergingNode(String encodedName, ServerName sn) { - deleteNodeInStates
<TRUNCATED>
