http://git-wip-us.apache.org/repos/asf/hbase/blob/17dff681/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCloseRegionCoordination.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCloseRegionCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCloseRegionCoordination.java deleted file mode 100644 index 77ed84d..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCloseRegionCoordination.java +++ /dev/null @@ -1,197 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.coordination; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.CoordinatedStateManager; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; - -import java.io.IOException; - -/** - * ZK-based implementation of {@link CloseRegionCoordination}. - */ [email protected] -public class ZkCloseRegionCoordination implements CloseRegionCoordination { - private static final Log LOG = LogFactory.getLog(ZkCloseRegionCoordination.class); - - private final static int FAILED_VERSION = -1; - - private CoordinatedStateManager csm; - private final ZooKeeperWatcher watcher; - - public ZkCloseRegionCoordination(CoordinatedStateManager csm, ZooKeeperWatcher watcher) { - this.csm = csm; - this.watcher = watcher; - } - - /** - * In ZK-based version we're checking for bad znode state, e.g. if we're - * trying to delete the znode, and it's not ours (version doesn't match). - */ - @Override - public boolean checkClosingState(HRegionInfo regionInfo, CloseRegionDetails crd) { - ZkCloseRegionDetails zkCrd = (ZkCloseRegionDetails) crd; - - try { - return zkCrd.isPublishStatusInZk() && !ZKAssign.checkClosingState(watcher, - regionInfo, ((ZkCloseRegionDetails) crd).getExpectedVersion()); - } catch (KeeperException ke) { - csm.getServer().abort("Unrecoverable exception while checking state with zk " + - regionInfo.getRegionNameAsString() + ", still finishing close", ke); - throw new RuntimeException(ke); - } - } - - /** - * In ZK-based version we do some znodes transitioning. - */ - @Override - public void setClosedState(HRegion region, ServerName sn, CloseRegionDetails crd) { - ZkCloseRegionDetails zkCrd = (ZkCloseRegionDetails) crd; - String name = region.getRegionInfo().getRegionNameAsString(); - - if (zkCrd.isPublishStatusInZk()) { - if (setClosedState(region,sn, zkCrd)) { - LOG.debug("Set closed state in zk for " + name + " on " + sn); - } else { - LOG.debug("Set closed state in zk UNSUCCESSFUL for " + name + " on " + sn); - } - } - } - - /** - * Parse ZK-related fields from request. - */ - @Override - public CloseRegionDetails parseFromProtoRequest(AdminProtos.CloseRegionRequest request) { - ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd = - new ZkCloseRegionCoordination.ZkCloseRegionDetails(); - zkCrd.setPublishStatusInZk(request.getTransitionInZK()); - int versionOfClosingNode = -1; - if (request.hasVersionOfClosingNode()) { - versionOfClosingNode = request.getVersionOfClosingNode(); - } - zkCrd.setExpectedVersion(versionOfClosingNode); - - return zkCrd; - } - - /** - * No ZK tracking will be performed for that case. - * This method should be used when we want to construct CloseRegionDetails, - * but don't want any coordination on that (when it's initiated by regionserver), - * so no znode state transitions will be performed. - */ - @Override - public CloseRegionDetails getDetaultDetails() { - ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd = - new ZkCloseRegionCoordination.ZkCloseRegionDetails(); - zkCrd.setPublishStatusInZk(false); - zkCrd.setExpectedVersion(FAILED_VERSION); - - return zkCrd; - } - - /** - * Transition ZK node to CLOSED - * @param region HRegion instance being closed - * @param sn ServerName on which task runs - * @param zkCrd details about region closing operation. - * @return If the state is set successfully - */ - private boolean setClosedState(final HRegion region, - ServerName sn, - ZkCloseRegionDetails zkCrd) { - final int expectedVersion = zkCrd.getExpectedVersion(); - - try { - if (ZKAssign.transitionNodeClosed(watcher, region.getRegionInfo(), - sn, expectedVersion) == FAILED_VERSION) { - LOG.warn("Completed the CLOSE of a region but when transitioning from " + - " CLOSING to CLOSED got a version mismatch, someone else clashed " + - "so now unassigning"); - region.close(); - return false; - } - } catch (NullPointerException e) { - // I've seen NPE when table was deleted while close was running in unit tests. - LOG.warn("NPE during close -- catching and continuing...", e); - return false; - } catch (KeeperException e) { - LOG.error("Failed transitioning node from CLOSING to CLOSED", e); - return false; - } catch (IOException e) { - LOG.error("Failed to close region after failing to transition", e); - return false; - } - return true; - } - - /** - * ZK-based implementation. Has details about whether the state transition should be - * reflected in ZK, as well as expected version of znode. - */ - public static class ZkCloseRegionDetails implements CloseRegionCoordination.CloseRegionDetails { - - /** - * True if we are to update zk about the region close; if the close - * was orchestrated by master, then update zk. If the close is being run by - * the regionserver because its going down, don't update zk. - * */ - private boolean publishStatusInZk; - - /** - * The version of znode to compare when RS transitions the znode from - * CLOSING state. - */ - private int expectedVersion = FAILED_VERSION; - - public ZkCloseRegionDetails() { - } - - public ZkCloseRegionDetails(boolean publishStatusInZk, int expectedVersion) { - this.publishStatusInZk = publishStatusInZk; - this.expectedVersion = expectedVersion; - } - - public boolean isPublishStatusInZk() { - return publishStatusInZk; - } - - public void setPublishStatusInZk(boolean publishStatusInZk) { - this.publishStatusInZk = publishStatusInZk; - } - - public int getExpectedVersion() { - return expectedVersion; - } - - public void setExpectedVersion(int expectedVersion) { - this.expectedVersion = expectedVersion; - } - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17dff681/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java index a5492a9..4d62e54 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.coordination; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.CoordinatedStateException; import org.apache.hadoop.hbase.Server; @@ -32,23 +30,13 @@ import org.apache.zookeeper.KeeperException; */ @InterfaceAudience.Private public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { - private static final Log LOG = LogFactory.getLog(ZkCoordinatedStateManager.class); protected Server server; protected ZooKeeperWatcher watcher; - protected SplitTransactionCoordination splitTransactionCoordination; - protected CloseRegionCoordination closeRegionCoordination; - protected OpenRegionCoordination openRegionCoordination; - protected RegionMergeCoordination regionMergeCoordination; @Override public void initialize(Server server) { this.server = server; this.watcher = server.getZooKeeper(); - - splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher); - closeRegionCoordination = new ZkCloseRegionCoordination(this, watcher); - openRegionCoordination = new ZkOpenRegionCoordination(this, watcher); - regionMergeCoordination = new ZkRegionMergeCoordination(this, watcher); } @Override @@ -65,24 +53,4 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { throw new CoordinatedStateException(e); } } - - @Override - public SplitTransactionCoordination getSplitTransactionCoordination() { - return splitTransactionCoordination; - } - - @Override - public CloseRegionCoordination getCloseRegionCoordination() { - return closeRegionCoordination; - } - - @Override - public OpenRegionCoordination getOpenRegionCoordination() { - return openRegionCoordination; - } - - @Override - public RegionMergeCoordination getRegionMergeCoordination() { - return regionMergeCoordination; - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/17dff681/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkOpenRegionCoordination.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkOpenRegionCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkOpenRegionCoordination.java deleted file mode 100644 index 290ed09..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkOpenRegionCoordination.java +++ /dev/null @@ -1,414 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.coordination; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.CoordinatedStateManager; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.RegionState; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; - -import java.io.IOException; - -/** - * ZK-based implementation of {@link OpenRegionCoordination}. - */ [email protected] -public class ZkOpenRegionCoordination implements OpenRegionCoordination { - private static final Log LOG = LogFactory.getLog(ZkOpenRegionCoordination.class); - - private CoordinatedStateManager coordination; - private final ZooKeeperWatcher watcher; - - public ZkOpenRegionCoordination(CoordinatedStateManager coordination, - ZooKeeperWatcher watcher) { - this.coordination = coordination; - this.watcher = watcher; - } - - //------------------------------- - // Region Server-side operations - //------------------------------- - - /** - * @param r Region we're working on. - * @return whether znode is successfully transitioned to OPENED state. - * @throws java.io.IOException - */ - @Override - public boolean transitionToOpened(final HRegion r, OpenRegionDetails ord) throws IOException { - ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord; - - boolean result = false; - HRegionInfo hri = r.getRegionInfo(); - final String name = hri.getRegionNameAsString(); - // Finally, Transition ZK node to OPENED - try { - if (ZKAssign.transitionNodeOpened(watcher, hri, - zkOrd.getServerName(), zkOrd.getVersion()) == -1) { - String warnMsg = "Completed the OPEN of region " + name + - " but when transitioning from " + " OPENING to OPENED "; - try { - String node = ZKAssign.getNodeName(watcher, hri.getEncodedName()); - if (ZKUtil.checkExists(watcher, node) < 0) { - // if the znode - coordination.getServer().abort(warnMsg + "the znode disappeared", null); - } else { - LOG.warn(warnMsg + "got a version mismatch, someone else clashed; " + - "so now unassigning -- closing region on server: " + zkOrd.getServerName()); - } - } catch (KeeperException ke) { - coordination.getServer().abort(warnMsg, ke); - } - } else { - LOG.debug("Transitioned " + r.getRegionInfo().getEncodedName() + - " to OPENED in zk on " + zkOrd.getServerName()); - result = true; - } - } catch (KeeperException e) { - LOG.error("Failed transitioning node " + name + - " from OPENING to OPENED -- closing region", e); - } - return result; - } - - /** - * Transition ZK node from OFFLINE to OPENING. - * @param regionInfo region info instance - * @param ord - instance of open region details, for ZK implementation - * will include version Of OfflineNode that needs to be compared - * before changing the node's state from OFFLINE - * @return True if successful transition. - */ - @Override - public boolean transitionFromOfflineToOpening(HRegionInfo regionInfo, - OpenRegionDetails ord) { - ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord; - - // encoded name is used as znode encoded name in ZK - final String encodedName = regionInfo.getEncodedName(); - - // TODO: should also handle transition from CLOSED? - try { - // Initialize the znode version. - zkOrd.setVersion(ZKAssign.transitionNode(watcher, regionInfo, - zkOrd.getServerName(), EventType.M_ZK_REGION_OFFLINE, - EventType.RS_ZK_REGION_OPENING, zkOrd.getVersionOfOfflineNode())); - } catch (KeeperException e) { - LOG.error("Error transition from OFFLINE to OPENING for region=" + - encodedName, e); - zkOrd.setVersion(-1); - return false; - } - boolean b = isGoodVersion(zkOrd); - if (!b) { - LOG.warn("Failed transition from OFFLINE to OPENING for region=" + - encodedName); - } - return b; - } - - /** - * Update our OPENING state in zookeeper. - * Do this so master doesn't timeout this region-in-transition. - * We may lose the znode ownership during the open. Currently its - * too hard interrupting ongoing region open. Just let it complete - * and check we still have the znode after region open. - * - * @param context Some context to add to logs if failure - * @return True if successful transition. - */ - @Override - public boolean tickleOpening(OpenRegionDetails ord, HRegionInfo regionInfo, - RegionServerServices rsServices, final String context) { - ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord; - if (!isRegionStillOpening(regionInfo, rsServices)) { - LOG.warn("Open region aborted since it isn't opening any more"); - return false; - } - // If previous checks failed... do not try again. - if (!isGoodVersion(zkOrd)) return false; - String encodedName = regionInfo.getEncodedName(); - try { - zkOrd.setVersion(ZKAssign.confirmNodeOpening(watcher, - regionInfo, zkOrd.getServerName(), zkOrd.getVersion())); - } catch (KeeperException e) { - coordination.getServer().abort("Exception refreshing OPENING; region=" + encodedName + - ", context=" + context, e); - zkOrd.setVersion(-1); - return false; - } - boolean b = isGoodVersion(zkOrd); - if (!b) { - LOG.warn("Failed refreshing OPENING; region=" + encodedName + - ", context=" + context); - } - return b; - } - - /** - * Try to transition to open. - * - * This is not guaranteed to succeed, we just do our best. - * - * @param rsServices - * @param hri Region we're working on. - * @param ord Details about region open task - * @return whether znode is successfully transitioned to FAILED_OPEN state. - */ - @Override - public boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices, - final HRegionInfo hri, - OpenRegionDetails ord) { - ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord; - boolean result = false; - final String name = hri.getRegionNameAsString(); - try { - LOG.info("Opening of region " + hri + " failed, transitioning" + - " from OFFLINE to FAILED_OPEN in ZK, expecting version " + - zkOrd.getVersionOfOfflineNode()); - if (ZKAssign.transitionNode( - rsServices.getZooKeeper(), hri, - rsServices.getServerName(), - EventType.M_ZK_REGION_OFFLINE, - EventType.RS_ZK_REGION_FAILED_OPEN, - zkOrd.getVersionOfOfflineNode()) == -1) { - LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " + - "It's likely that the master already timed out this open " + - "attempt, and thus another RS already has the region."); - } else { - result = true; - } - } catch (KeeperException e) { - LOG.error("Failed transitioning node " + name + " from OFFLINE to FAILED_OPEN", e); - } - return result; - } - - private boolean isGoodVersion(ZkOpenRegionDetails zkOrd) { - return zkOrd.getVersion() != -1; - } - - /** - * This is not guaranteed to succeed, we just do our best. - * @param hri Region we're working on. - * @return whether znode is successfully transitioned to FAILED_OPEN state. - */ - @Override - public boolean tryTransitionFromOpeningToFailedOpen(final HRegionInfo hri, - OpenRegionDetails ord) { - ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord; - boolean result = false; - final String name = hri.getRegionNameAsString(); - try { - LOG.info("Opening of region " + hri + " failed, transitioning" + - " from OPENING to FAILED_OPEN in ZK, expecting version " + zkOrd.getVersion()); - if (ZKAssign.transitionNode( - watcher, hri, - zkOrd.getServerName(), - EventType.RS_ZK_REGION_OPENING, - EventType.RS_ZK_REGION_FAILED_OPEN, - zkOrd.getVersion()) == -1) { - LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " + - "It's likely that the master already timed out this open " + - "attempt, and thus another RS already has the region."); - } else { - result = true; - } - } catch (KeeperException e) { - LOG.error("Failed transitioning node " + name + - " from OPENING to FAILED_OPEN", e); - } - return result; - } - - /** - * Parse ZK-related fields from request. - */ - @Override - public OpenRegionCoordination.OpenRegionDetails parseFromProtoRequest( - AdminProtos.OpenRegionRequest.RegionOpenInfo regionOpenInfo) { - ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd = - new ZkOpenRegionCoordination.ZkOpenRegionDetails(); - - int versionOfOfflineNode = -1; - if (regionOpenInfo.hasVersionOfOfflineNode()) { - versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode(); - } - zkCrd.setVersionOfOfflineNode(versionOfOfflineNode); - zkCrd.setServerName(coordination.getServer().getServerName()); - - return zkCrd; - } - - /** - * No ZK tracking will be performed for that case. - * This method should be used when we want to construct CloseRegionDetails, - * but don't want any coordination on that (when it's initiated by regionserver), - * so no znode state transitions will be performed. - */ - @Override - public OpenRegionCoordination.OpenRegionDetails getDetailsForNonCoordinatedOpening() { - ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd = - new ZkOpenRegionCoordination.ZkOpenRegionDetails(); - zkCrd.setVersionOfOfflineNode(-1); - zkCrd.setServerName(coordination.getServer().getServerName()); - - return zkCrd; - } - - //-------------------------- - // HMaster-side operations - //-------------------------- - @Override - public boolean commitOpenOnMasterSide(AssignmentManager assignmentManager, - HRegionInfo regionInfo, - OpenRegionDetails ord) { - boolean committedSuccessfully = true; - - // Code to defend against case where we get SPLIT before region open - // processing completes; temporary till we make SPLITs go via zk -- 0.92. - RegionState regionState = assignmentManager.getRegionStates() - .getRegionTransitionState(regionInfo.getEncodedName()); - boolean openedNodeDeleted = false; - if (regionState != null && regionState.isOpened()) { - openedNodeDeleted = deleteOpenedNode(regionInfo, ord); - if (!openedNodeDeleted) { - LOG.error("Znode of region " + regionInfo.getShortNameToLog() + " could not be deleted."); - } - } else { - LOG.warn("Skipping the onlining of " + regionInfo.getShortNameToLog() + - " because regions is NOT in RIT -- presuming this is because it SPLIT"); - } - if (!openedNodeDeleted) { - if (assignmentManager.getTableStateManager().isTableState(regionInfo.getTable(), - ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { - debugLog(regionInfo, "Opened region " - + regionInfo.getShortNameToLog() + " but " - + "this table is disabled, triggering close of region"); - committedSuccessfully = false; - } - } - - return committedSuccessfully; - } - - private boolean deleteOpenedNode(HRegionInfo regionInfo, OpenRegionDetails ord) { - ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord; - int expectedVersion = zkOrd.getVersion(); - - debugLog(regionInfo, "Handling OPENED of " + - regionInfo.getShortNameToLog() + " from " + zkOrd.getServerName().toString() + - "; deleting unassigned node"); - try { - // delete the opened znode only if the version matches. - return ZKAssign.deleteNode(this.coordination.getServer().getZooKeeper(), - regionInfo.getEncodedName(), EventType.RS_ZK_REGION_OPENED, expectedVersion); - } catch(KeeperException.NoNodeException e){ - // Getting no node exception here means that already the region has been opened. - LOG.warn("The znode of the region " + regionInfo.getShortNameToLog() + - " would have already been deleted"); - return false; - } catch (KeeperException e) { - this.coordination.getServer().abort("Error deleting OPENED node in ZK (" + - regionInfo.getRegionNameAsString() + ")", e); - } - return false; - } - - private void debugLog(HRegionInfo region, String string) { - if (region.isMetaTable()) { - LOG.info(string); - } else { - LOG.debug(string); - } - } - - // Additional classes and helper methods - - /** - * ZK-based implementation. Has details about whether the state transition should be - * reflected in ZK, as well as expected version of znode. - */ - public static class ZkOpenRegionDetails implements OpenRegionCoordination.OpenRegionDetails { - - // We get version of our znode at start of open process and monitor it across - // the total open. We'll fail the open if someone hijacks our znode; we can - // tell this has happened if version is not as expected. - private volatile int version = -1; - - //version of the offline node that was set by the master - private volatile int versionOfOfflineNode = -1; - - /** - * Server name the handler is running on. - */ - private ServerName serverName; - - public ZkOpenRegionDetails() { - } - - public ZkOpenRegionDetails(int versionOfOfflineNode) { - this.versionOfOfflineNode = versionOfOfflineNode; - } - - public int getVersionOfOfflineNode() { - return versionOfOfflineNode; - } - - public void setVersionOfOfflineNode(int versionOfOfflineNode) { - this.versionOfOfflineNode = versionOfOfflineNode; - } - - public int getVersion() { - return version; - } - - public void setVersion(int version) { - this.version = version; - } - - @Override - public ServerName getServerName() { - return serverName; - } - - @Override - public void setServerName(ServerName serverName) { - this.serverName = serverName; - } - } - - private boolean isRegionStillOpening(HRegionInfo regionInfo, RegionServerServices rsServices) { - byte[] encodedName = regionInfo.getEncodedNameAsBytes(); - Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName); - return Boolean.TRUE.equals(action); // true means opening for RIT - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/17dff681/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkRegionMergeCoordination.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkRegionMergeCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkRegionMergeCoordination.java deleted file mode 100644 index 8c18821..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkRegionMergeCoordination.java +++ /dev/null @@ -1,326 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.coordination; - -import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED; -import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING; -import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.CoordinatedStateManager; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.RegionTransition; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.data.Stat; - -public class ZkRegionMergeCoordination implements RegionMergeCoordination { - - private CoordinatedStateManager manager; - private final ZooKeeperWatcher watcher; - - private static final Log LOG = LogFactory.getLog(ZkRegionMergeCoordination.class); - - public ZkRegionMergeCoordination(CoordinatedStateManager manager, - ZooKeeperWatcher watcher) { - this.manager = manager; - this.watcher = watcher; - } - - /** - * ZK-based implementation. Has details about whether the state transition should be reflected in - * ZK, as well as expected version of znode. - */ - public static class ZkRegionMergeDetails implements RegionMergeCoordination.RegionMergeDetails { - private int znodeVersion; - - public ZkRegionMergeDetails() { - } - - public int getZnodeVersion() { - return znodeVersion; - } - - public void setZnodeVersion(int znodeVersion) { - this.znodeVersion = znodeVersion; - } - } - - @Override - public RegionMergeDetails getDefaultDetails() { - ZkRegionMergeDetails zstd = new ZkRegionMergeDetails(); - zstd.setZnodeVersion(-1); - return zstd; - } - - /** - * Wait for the merging node to be transitioned from pending_merge - * to merging by master. That's how we are sure master has processed - * the event and is good with us to move on. If we don't get any update, - * we periodically transition the node so that master gets the callback. - * If the node is removed or is not in pending_merge state any more, - * we abort the merge. - * @throws IOException - */ - - @Override - public void waitForRegionMergeTransaction(RegionServerServices services, - HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails details) - throws IOException { - try { - int spins = 0; - Stat stat = new Stat(); - ServerName expectedServer = manager.getServer().getServerName(); - String node = mergedRegionInfo.getEncodedName(); - ZkRegionMergeDetails zdetails = (ZkRegionMergeDetails) details; - while (!(manager.getServer().isStopped() || services.isStopping())) { - if (spins % 5 == 0) { - LOG.debug("Still waiting for master to process " + "the pending_merge for " + node); - ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) getDefaultDetails(); - transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), - region_b.getRegionInfo(), expectedServer, zrmd, RS_ZK_REQUEST_REGION_MERGE, - RS_ZK_REQUEST_REGION_MERGE); - } - Thread.sleep(100); - spins++; - byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat); - if (data == null) { - throw new IOException("Data is null, merging node " + node + " no longer exists"); - } - RegionTransition rt = RegionTransition.parseFrom(data); - EventType et = rt.getEventType(); - if (et == RS_ZK_REGION_MERGING) { - ServerName serverName = rt.getServerName(); - if (!serverName.equals(expectedServer)) { - throw new IOException("Merging node " + node + " is for " + serverName + ", not us " - + expectedServer); - } - byte[] payloadOfMerging = rt.getPayload(); - List<HRegionInfo> mergingRegions = - HRegionInfo.parseDelimitedFrom(payloadOfMerging, 0, payloadOfMerging.length); - assert mergingRegions.size() == 3; - HRegionInfo a = mergingRegions.get(1); - HRegionInfo b = mergingRegions.get(2); - HRegionInfo hri_a = region_a.getRegionInfo(); - HRegionInfo hri_b = region_b.getRegionInfo(); - if (!(hri_a.equals(a) && hri_b.equals(b))) { - throw new IOException("Merging node " + node + " is for " + a + ", " + b - + ", not expected regions: " + hri_a + ", " + hri_b); - } - // Master has processed it. - zdetails.setZnodeVersion(stat.getVersion()); - return; - } - if (et != RS_ZK_REQUEST_REGION_MERGE) { - throw new IOException("Merging node " + node + " moved out of merging to " + et); - } - } - // Server is stopping/stopped - throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped")); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new IOException("Failed getting MERGING znode on " - + mergedRegionInfo.getRegionNameAsString(), e); - } - } - - /** - * Creates a new ephemeral node in the PENDING_MERGE state for the merged region. - * Create it ephemeral in case regionserver dies mid-merge. - * - * <p> - * Does not transition nodes from other states. If a node already exists for - * this region, a {@link NodeExistsException} will be thrown. - * - * @param region region to be created as offline - * @param serverName server event originates from - * @throws IOException - */ - @Override - public void startRegionMergeTransaction(final HRegionInfo region, final ServerName serverName, - final HRegionInfo a, final HRegionInfo b) throws IOException { - LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName() - + " in PENDING_MERGE state")); - byte[] payload = HRegionInfo.toDelimitedByteArray(region, a, b); - RegionTransition rt = - RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(), - serverName, payload); - String node = ZKAssign.getNodeName(watcher, region.getEncodedName()); - try { - if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) { - throw new IOException("Failed create of ephemeral " + node); - } - } catch (KeeperException e) { - throw new IOException(e); - } - } - - /* - * (non-Javadoc) - * @see - * org.apache.hadoop.hbase.regionserver.coordination.RegionMergeCoordination#clean(org.apache.hadoop - * .hbase.Server, org.apache.hadoop.hbase.HRegionInfo) - */ - @Override - public void clean(final HRegionInfo hri) { - try { - // Only delete if its in expected state; could have been hijacked. - if (!ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REQUEST_REGION_MERGE, manager - .getServer().getServerName())) { - ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REGION_MERGING, manager - .getServer().getServerName()); - } - } catch (KeeperException.NoNodeException e) { - LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e); - } catch (KeeperException e) { - manager.getServer().abort("Failed cleanup zk node of " + hri.getRegionNameAsString(), e); - } - } - - /* - * ZooKeeper implementation of finishRegionMergeTransaction - */ - @Override - public void completeRegionMergeTransaction(final RegionServerServices services, - HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails rmd, - HRegion mergedRegion) throws IOException { - ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd; - if (manager.getServer() == null - || manager.getServer().getCoordinatedStateManager() == null) { - return; - } - // Tell master about merge by updating zk. If we fail, abort. - try { - transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(), - manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED); - - long startTime = EnvironmentEdgeManager.currentTimeMillis(); - int spins = 0; - // Now wait for the master to process the merge. We know it's done - // when the znode is deleted. The reason we keep tickling the znode is - // that it's possible for the master to miss an event. - do { - if (spins % 10 == 0) { - LOG.debug("Still waiting on the master to process the merge for " - + mergedRegionInfo.getEncodedName() + ", waited " - + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms"); - } - Thread.sleep(100); - // When this returns -1 it means the znode doesn't exist - transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(), - manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED); - spins++; - } while (zrmd.getZnodeVersion() != -1 && !manager.getServer().isStopped() - && !services.isStopping()); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new IOException("Failed telling master about merge " - + mergedRegionInfo.getEncodedName(), e); - } - // Leaving here, the mergedir with its dross will be in place but since the - // merge was successful, just leave it; it'll be cleaned when region_a is - // cleaned up by CatalogJanitor on master - } - - /* - * Zookeeper implementation of region merge confirmation - */ - @Override - public void confirmRegionMergeTransaction(HRegionInfo merged, HRegionInfo a, HRegionInfo b, - ServerName serverName, RegionMergeDetails rmd) throws IOException { - transitionMergingNode(merged, a, b, serverName, rmd, RS_ZK_REGION_MERGING, - RS_ZK_REGION_MERGING); - } - - /* - * Zookeeper implementation of region merge processing - */ - @Override - public void processRegionMergeRequest(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b, - ServerName sn, RegionMergeDetails rmd) throws IOException { - transitionMergingNode(p, hri_a, hri_b, sn, rmd, EventType.RS_ZK_REQUEST_REGION_MERGE, - EventType.RS_ZK_REGION_MERGING); - } - - /** - * Transitions an existing ephemeral node for the specified region which is - * currently in the begin state to be in the end state. Master cleans up the - * final MERGE znode when it reads it (or if we crash, zk will clean it up). - * - * <p> - * Does not transition nodes from other states. If for some reason the node - * could not be transitioned, the method returns -1. If the transition is - * successful, the version of the node after transition is updated in details. - * - * <p> - * This method can fail and return false for three different reasons: - * <ul> - * <li>Node for this region does not exist</li> - * <li>Node for this region is not in the begin state</li> - * <li>After verifying the begin state, update fails because of wrong version - * (this should never actually happen since an RS only does this transition - * following a transition to the begin state. If two RS are conflicting, one would - * fail the original transition to the begin state and not this transition)</li> - * </ul> - * - * <p> - * Does not set any watches. - * - * <p> - * This method should only be used by a RegionServer when merging two regions. - * - * @param merged region to be transitioned to opened - * @param a merging region A - * @param b merging region B - * @param serverName server event originates from - * @param rmd region merge details - * @param beginState the expected current state the node should be - * @param endState the state to be transition to - * @throws IOException - */ - private void transitionMergingNode(HRegionInfo merged, HRegionInfo a, HRegionInfo b, - ServerName serverName, RegionMergeDetails rmd, final EventType beginState, - final EventType endState) throws IOException { - ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd; - byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b); - try { - zrmd.setZnodeVersion(ZKAssign.transitionNode(watcher, merged, serverName, beginState, - endState, zrmd.getZnodeVersion(), payload)); - } catch (KeeperException e) { - throw new IOException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/17dff681/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java index ddab430..88e5c2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java @@ -45,7 +45,7 @@ public class AssignCallable implements Callable<Object> { @Override public Object call() throws Exception { - assignmentManager.assign(hri, true, newPlan); + assignmentManager.assign(hri, newPlan); return null; } }
