http://git-wip-us.apache.org/repos/asf/hbase/blob/db1dcf3e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java deleted file mode 100644 index 3600fe0..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java +++ /dev/null @@ -1,906 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.master.procedure; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MetaMutationAnnotation; -import org.apache.hadoop.hbase.RegionLoad; -import org.apache.hadoop.hbase.ServerLoad; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.UnknownRegionException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.exceptions.MergeRegionException; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.CatalogJanitor; -import org.apache.hadoop.hbase.master.MasterCoprocessorHost; -import org.apache.hadoop.hbase.master.MasterFileSystem; -import org.apache.hadoop.hbase.master.RegionPlan; -import org.apache.hadoop.hbase.master.RegionState; -import org.apache.hadoop.hbase.master.RegionStates; -import org.apache.hadoop.hbase.master.ServerManager; -import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.StoreFileInfo; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MergeTableRegionsState; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSUtils; - -/** - * The procedure to Merge a region in a table. - */ -@InterfaceAudience.Private -public class MergeTableRegionsProcedure - extends AbstractStateMachineTableProcedure<MergeTableRegionsState> { - private static final Log LOG = LogFactory.getLog(MergeTableRegionsProcedure.class); - - private Boolean traceEnabled; - private AssignmentManager assignmentManager; - private int timeout; - private ServerName regionLocation; - private String regionsToMergeListFullName; - private String regionsToMergeListEncodedName; - - private HRegionInfo [] regionsToMerge; - private HRegionInfo mergedRegionInfo; - private boolean forcible; - - public MergeTableRegionsProcedure() { - this.traceEnabled = isTraceEnabled(); - this.assignmentManager = null; - this.timeout = -1; - this.regionLocation = null; - this.regionsToMergeListFullName = null; - this.regionsToMergeListEncodedName = null; - } - - public MergeTableRegionsProcedure( - final MasterProcedureEnv env, - final HRegionInfo[] regionsToMerge, - final boolean forcible) throws IOException { - super(env); - this.traceEnabled = isTraceEnabled(); - this.assignmentManager = getAssignmentManager(env); - // For now, we only merge 2 regions. It could be extended to more than 2 regions in - // the future. - assert(regionsToMerge.length == 2); - assert(regionsToMerge[0].getTable() == regionsToMerge[1].getTable()); - this.regionsToMerge = regionsToMerge; - this.forcible = forcible; - - this.timeout = -1; - this.regionsToMergeListFullName = getRegionsToMergeListFullNameString(); - this.regionsToMergeListEncodedName = getRegionsToMergeListEncodedNameString(); - - // Check daughter regions and make sure that we have valid daughter regions before - // doing the real work. - checkDaughterRegions(); - // WARN: make sure there is no parent region of the two merging regions in - // hbase:meta If exists, fixing up daughters would cause daughter regions(we - // have merged one) online again when we restart master, so we should clear - // the parent region to prevent the above case - // Since HBASE-7721, we don't need fix up daughters any more. so here do - // nothing - setupMergedRegionInfo(); - } - - @Override - protected Flow executeFromState( - final MasterProcedureEnv env, - final MergeTableRegionsState state) throws InterruptedException { - if (isTraceEnabled()) { - LOG.trace(this + " execute state=" + state); - } - - try { - switch (state) { - case MERGE_TABLE_REGIONS_PREPARE: - prepareMergeRegion(env); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS); - break; - case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS: - if (MoveRegionsToSameRS(env)) { - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION); - } else { - LOG.info("Cancel merging regions " + getRegionsToMergeListFullNameString() - + ", because can't move them to the same RS"); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION); - } - break; - case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION: - preMergeRegions(env); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE); - break; - case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE: - setRegionStateToMerging(env); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS); - break; - case MERGE_TABLE_REGIONS_CLOSE_REGIONS: - closeRegionsForMerge(env); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION); - break; - case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION: - createMergedRegion(env); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION); - break; - case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION: - preMergeRegionsCommit(env); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_UPDATE_META); - break; - case MERGE_TABLE_REGIONS_UPDATE_META: - updateMetaForMergedRegions(env); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION); - break; - case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: - postMergeRegionsCommit(env); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_OPEN_MERGED_REGION); - break; - case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: - openMergedRegions(env); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION); - break; - case MERGE_TABLE_REGIONS_POST_OPERATION: - postCompletedMergeRegions(env); - return Flow.NO_MORE_STATE; - default: - throw new UnsupportedOperationException(this + " unhandled state=" + state); - } - } catch (IOException e) { - LOG.warn("Error trying to merge regions " + getRegionsToMergeListFullNameString() + - " in the table " + getTableName() + " (in state=" + state + ")", e); - - setFailure("master-merge-regions", e); - } - return Flow.HAS_MORE_STATE; - } - - @Override - protected void rollbackState( - final MasterProcedureEnv env, - final MergeTableRegionsState state) throws IOException, InterruptedException { - if (isTraceEnabled()) { - LOG.trace(this + " rollback state=" + state); - } - - try { - switch (state) { - case MERGE_TABLE_REGIONS_POST_OPERATION: - case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: - case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: - case MERGE_TABLE_REGIONS_UPDATE_META: - String msg = this + " We are in the " + state + " state." - + " It is complicated to rollback the merge operation that region server is working on." - + " Rollback is not supported and we should let the merge operation to complete"; - LOG.warn(msg); - // PONR - throw new UnsupportedOperationException(this + " unhandled state=" + state); - case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION: - break; - case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION: - cleanupMergedRegion(env); - break; - case MERGE_TABLE_REGIONS_CLOSE_REGIONS: - rollbackCloseRegionsForMerge(env); - break; - case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE: - setRegionStateToRevertMerging(env); - break; - case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION: - postRollBackMergeRegions(env); - break; - case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS: - break; // nothing to rollback - case MERGE_TABLE_REGIONS_PREPARE: - break; // nothing to rollback - default: - throw new UnsupportedOperationException(this + " unhandled state=" + state); - } - } catch (Exception e) { - // This will be retried. Unless there is a bug in the code, - // this should be just a "temporary error" (e.g. network down) - LOG.warn("Failed rollback attempt step " + state + " for merging the regions " - + getRegionsToMergeListFullNameString() + " in table " + getTableName(), e); - throw e; - } - } - - /* - * Check whether we are in the state that can be rollback - */ - @Override - protected boolean isRollbackSupported(final MergeTableRegionsState state) { - switch (state) { - case MERGE_TABLE_REGIONS_POST_OPERATION: - case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: - case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: - case MERGE_TABLE_REGIONS_UPDATE_META: - // It is not safe to rollback if we reach to these states. - return false; - default: - break; - } - return true; - } - - @Override - protected MergeTableRegionsState getState(final int stateId) { - return MergeTableRegionsState.forNumber(stateId); - } - - @Override - protected int getStateId(final MergeTableRegionsState state) { - return state.getNumber(); - } - - @Override - protected MergeTableRegionsState getInitialState() { - return MergeTableRegionsState.MERGE_TABLE_REGIONS_PREPARE; - } - - @Override - public void serializeStateData(final OutputStream stream) throws IOException { - super.serializeStateData(stream); - - MasterProcedureProtos.MergeTableRegionsStateData.Builder mergeTableRegionsMsg = - MasterProcedureProtos.MergeTableRegionsStateData.newBuilder() - .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser())) - .setMergedRegionInfo(HRegionInfo.convert(mergedRegionInfo)) - .setForcible(forcible); - for (HRegionInfo hri: regionsToMerge) { - mergeTableRegionsMsg.addRegionInfo(HRegionInfo.convert(hri)); - } - mergeTableRegionsMsg.build().writeDelimitedTo(stream); - } - - @Override - public void deserializeStateData(final InputStream stream) throws IOException { - super.deserializeStateData(stream); - - MasterProcedureProtos.MergeTableRegionsStateData mergeTableRegionsMsg = - MasterProcedureProtos.MergeTableRegionsStateData.parseDelimitedFrom(stream); - setUser(MasterProcedureUtil.toUserInfo(mergeTableRegionsMsg.getUserInfo())); - - assert(mergeTableRegionsMsg.getRegionInfoCount() == 2); - regionsToMerge = new HRegionInfo[mergeTableRegionsMsg.getRegionInfoCount()]; - for (int i = 0; i < regionsToMerge.length; i++) { - regionsToMerge[i] = HRegionInfo.convert(mergeTableRegionsMsg.getRegionInfo(i)); - } - - mergedRegionInfo = HRegionInfo.convert(mergeTableRegionsMsg.getMergedRegionInfo()); - } - - @Override - public void toStringClassDetails(StringBuilder sb) { - sb.append(getClass().getSimpleName()); - sb.append(" (table="); - sb.append(getTableName()); - sb.append(" regions="); - sb.append(getRegionsToMergeListFullNameString()); - sb.append(" forcible="); - sb.append(forcible); - sb.append(")"); - } - - @Override - protected LockState acquireLock(final MasterProcedureEnv env) { - if (env.waitInitialized(this)) { - return LockState.LOCK_EVENT_WAIT; - } - return env.getProcedureScheduler().waitRegions(this, getTableName(), - regionsToMerge[0], regionsToMerge[1])? - LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED; - } - - @Override - protected void releaseLock(final MasterProcedureEnv env) { - env.getProcedureScheduler().wakeRegions(this, getTableName(), - regionsToMerge[0], regionsToMerge[1]); - } - - @Override - public TableName getTableName() { - return regionsToMerge[0].getTable(); - } - - @Override - public TableOperationType getTableOperationType() { - return TableOperationType.MERGE; - } - - /** - * check daughter regions - * @throws IOException - */ - private void checkDaughterRegions() throws IOException { - // Note: the following logic assumes that we only have 2 regions to merge. In the future, - // if we want to extend to more than 2 regions, the code needs to modify a little bit. - // - if (regionsToMerge[0].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID || - regionsToMerge[1].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { - throw new MergeRegionException("Can't merge non-default replicas"); - } - - if (!HRegionInfo.areAdjacent(regionsToMerge[0], regionsToMerge[1])) { - String msg = "Trying to merge non-adjacent regions " - + getRegionsToMergeListFullNameString() + " where forcible = " + forcible; - LOG.warn(msg); - if (!forcible) { - throw new DoNotRetryIOException(msg); - } - } - } - - /** - * Prepare merge and do some check - * @param env MasterProcedureEnv - * @throws IOException - */ - private void prepareMergeRegion(final MasterProcedureEnv env) throws IOException { - // Note: the following logic assumes that we only have 2 regions to merge. In the future, - // if we want to extend to more than 2 regions, the code needs to modify a little bit. - // - CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor(); - boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]); - if (regionAHasMergeQualifier - || !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) { - String msg = "Skip merging regions " + getRegionsToMergeListFullNameString() - + ", because region " - + (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1] - .getEncodedName()) + " has merge qualifier"; - LOG.warn(msg); - throw new MergeRegionException(msg); - } - - RegionStates regionStates = getAssignmentManager(env).getRegionStates(); - RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName()); - RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName()); - if (regionStateA == null || regionStateB == null) { - throw new UnknownRegionException( - regionStateA == null ? - regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName()); - } - - if (!regionStateA.isOpened() || !regionStateB.isOpened()) { - throw new MergeRegionException( - "Unable to merge regions not online " + regionStateA + ", " + regionStateB); - } - } - - /** - * Create merged region info through the specified two regions - */ - private void setupMergedRegionInfo() { - long rid = EnvironmentEdgeManager.currentTime(); - // Regionid is timestamp. Merged region's id can't be less than that of - // merging regions else will insert at wrong location in hbase:meta - if (rid < regionsToMerge[0].getRegionId() || rid < regionsToMerge[1].getRegionId()) { - LOG.warn("Clock skew; merging regions id are " + regionsToMerge[0].getRegionId() - + " and " + regionsToMerge[1].getRegionId() + ", but current time here is " + rid); - rid = Math.max(regionsToMerge[0].getRegionId(), regionsToMerge[1].getRegionId()) + 1; - } - - byte[] startKey = null; - byte[] endKey = null; - // Choose the smaller as start key - if (regionsToMerge[0].compareTo(regionsToMerge[1]) <= 0) { - startKey = regionsToMerge[0].getStartKey(); - } else { - startKey = regionsToMerge[1].getStartKey(); - } - // Choose the bigger as end key - if (Bytes.equals(regionsToMerge[0].getEndKey(), HConstants.EMPTY_BYTE_ARRAY) - || (!Bytes.equals(regionsToMerge[1].getEndKey(), HConstants.EMPTY_BYTE_ARRAY) - && Bytes.compareTo(regionsToMerge[0].getEndKey(), regionsToMerge[1].getEndKey()) > 0)) { - endKey = regionsToMerge[0].getEndKey(); - } else { - endKey = regionsToMerge[1].getEndKey(); - } - - // Merged region is sorted between two merging regions in META - mergedRegionInfo = new HRegionInfo(getTableName(), startKey, endKey, false, rid); - } - - /** - * Move all regions to the same region server - * @param env MasterProcedureEnv - * @return whether target regions hosted by the same RS - * @throws IOException - */ - private boolean MoveRegionsToSameRS(final MasterProcedureEnv env) throws IOException { - // Make sure regions are on the same regionserver before send merge - // regions request to region server. - // - boolean onSameRS = isRegionsOnTheSameServer(env); - if (!onSameRS) { - // Note: the following logic assumes that we only have 2 regions to merge. In the future, - // if we want to extend to more than 2 regions, the code needs to modify a little bit. - // - RegionStates regionStates = getAssignmentManager(env).getRegionStates(); - ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]); - - RegionLoad loadOfRegionA = getRegionLoad(env, regionLocation, regionsToMerge[0]); - RegionLoad loadOfRegionB = getRegionLoad(env, regionLocation2, regionsToMerge[1]); - if (loadOfRegionA != null && loadOfRegionB != null - && loadOfRegionA.getRequestsCount() < loadOfRegionB.getRequestsCount()) { - // switch regionsToMerge[0] and regionsToMerge[1] - HRegionInfo tmpRegion = this.regionsToMerge[0]; - this.regionsToMerge[0] = this.regionsToMerge[1]; - this.regionsToMerge[1] = tmpRegion; - ServerName tmpLocation = regionLocation; - regionLocation = regionLocation2; - regionLocation2 = tmpLocation; - } - - long startTime = EnvironmentEdgeManager.currentTime(); - - RegionPlan regionPlan = new RegionPlan(regionsToMerge[1], regionLocation2, regionLocation); - LOG.info("Moving regions to same server for merge: " + regionPlan.toString()); - getAssignmentManager(env).balance(regionPlan); - do { - try { - Thread.sleep(20); - // Make sure check RIT first, then get region location, otherwise - // we would make a wrong result if region is online between getting - // region location and checking RIT - boolean isRIT = regionStates.isRegionInTransition(regionsToMerge[1]); - regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]); - onSameRS = regionLocation.equals(regionLocation2); - if (onSameRS || !isRIT) { - // Regions are on the same RS, or regionsToMerge[1] is not in - // RegionInTransition any more - break; - } - } catch (InterruptedException e) { - InterruptedIOException iioe = new InterruptedIOException(); - iioe.initCause(e); - throw iioe; - } - } while ((EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env)); - } - return onSameRS; - } - - /** - * Pre merge region action - * @param env MasterProcedureEnv - **/ - private void preMergeRegions(final MasterProcedureEnv env) throws IOException { - final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); - if (cpHost != null) { - boolean ret = cpHost.preMergeRegionsAction(regionsToMerge, getUser()); - if (ret) { - throw new IOException( - "Coprocessor bypassing regions " + getRegionsToMergeListFullNameString() + " merge."); - } - } - } - - /** - * Action after rollback a merge table regions action. - * @param env MasterProcedureEnv - * @throws IOException - */ - private void postRollBackMergeRegions(final MasterProcedureEnv env) throws IOException { - final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); - if (cpHost != null) { - cpHost.postRollBackMergeRegionsAction(regionsToMerge, getUser()); - } - } - - /** - * Set the region states to MERGING state - * @param env MasterProcedureEnv - * @throws IOException - */ - public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException { - RegionStateTransition.Builder transition = RegionStateTransition.newBuilder(); - transition.setTransitionCode(TransitionCode.READY_TO_MERGE); - transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo)); - transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0])); - transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1])); - if (env.getMasterServices().getAssignmentManager().onRegionTransition( - getServerName(env), transition.build()) != null) { - throw new IOException("Failed to update region state to MERGING for " - + getRegionsToMergeListFullNameString()); - } - } - - /** - * Rollback the region state change - * @param env MasterProcedureEnv - * @throws IOException - */ - private void setRegionStateToRevertMerging(final MasterProcedureEnv env) throws IOException { - RegionStateTransition.Builder transition = RegionStateTransition.newBuilder(); - transition.setTransitionCode(TransitionCode.MERGE_REVERTED); - transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo)); - transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0])); - transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1])); - String msg = env.getMasterServices().getAssignmentManager().onRegionTransition( - getServerName(env), transition.build()); - if (msg != null) { - // If daughter regions are online, the msg is coming from RPC retry. Ignore it. - RegionStates regionStates = getAssignmentManager(env).getRegionStates(); - if (!regionStates.isRegionOnline(regionsToMerge[0]) || - !regionStates.isRegionOnline(regionsToMerge[1])) { - throw new IOException("Failed to update region state for " - + getRegionsToMergeListFullNameString() - + " as part of operation for reverting merge. Error message: " + msg); - } - } - } - - /** - * Create merged region - * @param env MasterProcedureEnv - * @throws IOException - */ - private void createMergedRegion(final MasterProcedureEnv env) throws IOException { - final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); - final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable()); - final FileSystem fs = mfs.getFileSystem(); - HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( - env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false); - regionFs.createMergesDir(); - - mergeStoreFiles(env, regionFs, regionFs.getMergesDir()); - HRegionFileSystem regionFs2 = HRegionFileSystem.openRegionFromFileSystem( - env.getMasterConfiguration(), fs, tabledir, regionsToMerge[1], false); - mergeStoreFiles(env, regionFs2, regionFs.getMergesDir()); - - regionFs.commitMergedRegion(mergedRegionInfo); - } - - /** - * Create reference file(s) of merging regions under the merges directory - * @param env MasterProcedureEnv - * @param regionFs region file system - * @param mergedDir the temp directory of merged region - * @throws IOException - */ - private void mergeStoreFiles( - final MasterProcedureEnv env, final HRegionFileSystem regionFs, final Path mergedDir) - throws IOException { - final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); - final Configuration conf = env.getMasterConfiguration(); - final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName()); - - for (String family: regionFs.getFamilies()) { - final HColumnDescriptor hcd = htd.getFamily(family.getBytes()); - final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family); - - if (storeFiles != null && storeFiles.size() > 0) { - final CacheConfig cacheConf = new CacheConfig(conf, hcd); - for (StoreFileInfo storeFileInfo: storeFiles) { - // Create reference file(s) of the region in mergedDir - regionFs.mergeStoreFile(mergedRegionInfo, family, new StoreFile(mfs.getFileSystem(), - storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), true), - mergedDir); - } - } - } - } - - /** - * Clean up merged region - * @param env MasterProcedureEnv - * @throws IOException - */ - private void cleanupMergedRegion(final MasterProcedureEnv env) throws IOException { - final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); - final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable()); - final FileSystem fs = mfs.getFileSystem(); - HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( - env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false); - regionFs.cleanupMergedRegion(mergedRegionInfo); - } - - /** - * RPC to region server that host the regions to merge, ask for close these regions - * @param env MasterProcedureEnv - * @throws IOException - */ - private void closeRegionsForMerge(final MasterProcedureEnv env) throws IOException { - boolean success = env.getMasterServices().getServerManager().sendRegionCloseForSplitOrMerge( - getServerName(env), regionsToMerge[0], regionsToMerge[1]); - if (!success) { - throw new IOException("Close regions " + getRegionsToMergeListFullNameString() - + " for merging failed. Check region server log for more details."); - } - } - - /** - * Rollback close regions - * @param env MasterProcedureEnv - **/ - private void rollbackCloseRegionsForMerge(final MasterProcedureEnv env) throws IOException { - // Check whether the region is closed; if so, open it in the same server - RegionStates regionStates = getAssignmentManager(env).getRegionStates(); - for(int i = 1; i < regionsToMerge.length; i++) { - RegionState state = regionStates.getRegionState(regionsToMerge[i]); - if (state != null && (state.isClosing() || state.isClosed())) { - env.getMasterServices().getServerManager().sendRegionOpen( - getServerName(env), - regionsToMerge[i], - ServerName.EMPTY_SERVER_LIST); - } - } - } - - /** - * Post merge region action - * @param env MasterProcedureEnv - **/ - private void preMergeRegionsCommit(final MasterProcedureEnv env) throws IOException { - final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); - if (cpHost != null) { - @MetaMutationAnnotation - final List<Mutation> metaEntries = new ArrayList<>(); - boolean ret = cpHost.preMergeRegionsCommit(regionsToMerge, metaEntries, getUser()); - - if (ret) { - throw new IOException( - "Coprocessor bypassing regions " + getRegionsToMergeListFullNameString() + " merge."); - } - try { - for (Mutation p : metaEntries) { - HRegionInfo.parseRegionName(p.getRow()); - } - } catch (IOException e) { - LOG.error("Row key of mutation from coprocessor is not parsable as region name." - + "Mutations from coprocessor should only be for hbase:meta table.", e); - throw e; - } - } - } - - /** - * Add merged region to META and delete original regions. - * @param env MasterProcedureEnv - * @throws IOException - */ - private void updateMetaForMergedRegions(final MasterProcedureEnv env) throws IOException { - RegionStateTransition.Builder transition = RegionStateTransition.newBuilder(); - transition.setTransitionCode(TransitionCode.MERGE_PONR); - transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo)); - transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0])); - transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1])); - // Add merged region and delete original regions - // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region - // will determine whether the region is merged or not in case of failures. - if (env.getMasterServices().getAssignmentManager().onRegionTransition( - getServerName(env), transition.build()) != null) { - throw new IOException("Failed to update meta to add merged region that merges " - + getRegionsToMergeListFullNameString()); - } - } - - /** - * Post merge region action - * @param env MasterProcedureEnv - **/ - private void postMergeRegionsCommit(final MasterProcedureEnv env) throws IOException { - final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); - if (cpHost != null) { - cpHost.postMergeRegionsCommit(regionsToMerge, mergedRegionInfo, getUser()); - } - } - - /** - * Assign merged region - * @param env MasterProcedureEnv - * @throws IOException - * @throws InterruptedException - **/ - private void openMergedRegions(final MasterProcedureEnv env) - throws IOException, InterruptedException { - // Check whether the merged region is already opened; if so, - // this is retry and we should just ignore. - RegionState regionState = - getAssignmentManager(env).getRegionStates().getRegionState(mergedRegionInfo); - if (regionState != null && regionState.isOpened()) { - LOG.info("Skip opening merged region " + mergedRegionInfo.getRegionNameAsString() - + " as it is already opened."); - return; - } - - // TODO: The new AM should provide an API to force assign the merged region to the same RS - // as daughter regions; if the RS is unavailable, then assign to a different RS. - env.getMasterServices().getAssignmentManager().assignMergedRegion( - mergedRegionInfo, regionsToMerge[0], regionsToMerge[1]); - } - - /** - * Post merge region action - * @param env MasterProcedureEnv - **/ - private void postCompletedMergeRegions(final MasterProcedureEnv env) throws IOException { - final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); - if (cpHost != null) { - cpHost.postCompletedMergeRegionsAction(regionsToMerge, mergedRegionInfo, getUser()); - } - } - - private RegionLoad getRegionLoad( - final MasterProcedureEnv env, - final ServerName sn, - final HRegionInfo hri) { - ServerManager serverManager = env.getMasterServices().getServerManager(); - ServerLoad load = serverManager.getLoad(sn); - if (load != null) { - Map<byte[], RegionLoad> regionsLoad = load.getRegionsLoad(); - if (regionsLoad != null) { - return regionsLoad.get(hri.getRegionName()); - } - } - return null; - } - - /** - * The procedure could be restarted from a different machine. If the variable is null, we need to - * retrieve it. - * @param env MasterProcedureEnv - * @return whether target regions hosted by the same RS - */ - private boolean isRegionsOnTheSameServer(final MasterProcedureEnv env) throws IOException{ - Boolean onSameRS = true; - int i = 0; - RegionStates regionStates = getAssignmentManager(env).getRegionStates(); - regionLocation = regionStates.getRegionServerOfRegion(regionsToMerge[i]); - if (regionLocation != null) { - for(i = 1; i < regionsToMerge.length; i++) { - ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[i]); - if (regionLocation2 != null) { - if (onSameRS) { - onSameRS = regionLocation.equals(regionLocation2); - } - } else { - // At least one region is not online, merge will fail, no need to continue. - break; - } - } - if (i == regionsToMerge.length) { - // Finish checking all regions, return the result; - return onSameRS; - } - } - - // If reaching here, at least one region is not online. - String msg = "Skip merging regions " + getRegionsToMergeListFullNameString() + - ", because region " + regionsToMerge[i].getEncodedName() + " is not online now."; - LOG.warn(msg); - throw new IOException(msg); - } - - /** - * The procedure could be restarted from a different machine. If the variable is null, we need to - * retrieve it. - * @param env MasterProcedureEnv - * @return assignmentManager - */ - private AssignmentManager getAssignmentManager(final MasterProcedureEnv env) { - if (assignmentManager == null) { - assignmentManager = env.getMasterServices().getAssignmentManager(); - } - return assignmentManager; - } - - /** - * The procedure could be restarted from a different machine. If the variable is null, we need to - * retrieve it. - * @param env MasterProcedureEnv - * @return timeout value - */ - private int getTimeout(final MasterProcedureEnv env) { - if (timeout == -1) { - timeout = env.getMasterConfiguration().getInt( - "hbase.master.regionmerge.timeout", regionsToMerge.length * 60 * 1000); - } - return timeout; - } - - /** - * The procedure could be restarted from a different machine. If the variable is null, we need to - * retrieve it. - * @param env MasterProcedureEnv - * @return serverName - */ - private ServerName getServerName(final MasterProcedureEnv env) { - if (regionLocation == null) { - regionLocation = - getAssignmentManager(env).getRegionStates().getRegionServerOfRegion(regionsToMerge[0]); - } - return regionLocation; - } - - /** - * The procedure could be restarted from a different machine. If the variable is null, we need to - * retrieve it. - * @param fullName whether return only encoded name - * @return region names in a list - */ - private String getRegionsToMergeListFullNameString() { - if (regionsToMergeListFullName == null) { - StringBuilder sb = new StringBuilder("["); - int i = 0; - while(i < regionsToMerge.length - 1) { - sb.append(regionsToMerge[i].getRegionNameAsString() + ", "); - i++; - } - sb.append(regionsToMerge[i].getRegionNameAsString() + " ]"); - regionsToMergeListFullName = sb.toString(); - } - return regionsToMergeListFullName; - } - - /** - * The procedure could be restarted from a different machine. If the variable is null, we need to - * retrieve it. - * @return encoded region names - */ - private String getRegionsToMergeListEncodedNameString() { - if (regionsToMergeListEncodedName == null) { - StringBuilder sb = new StringBuilder("["); - int i = 0; - while(i < regionsToMerge.length - 1) { - sb.append(regionsToMerge[i].getEncodedName() + ", "); - i++; - } - sb.append(regionsToMerge[i].getEncodedName() + " ]"); - regionsToMergeListEncodedName = sb.toString(); - } - return regionsToMergeListEncodedName; - } - - /** - * The procedure could be restarted from a different machine. If the variable is null, we need to - * retrieve it. - * @return traceEnabled - */ - private Boolean isTraceEnabled() { - if (traceEnabled == null) { - traceEnabled = LOG.isTraceEnabled(); - } - return traceEnabled; - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db1dcf3e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java index 52bb4d5..622c19f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java @@ -21,17 +21,14 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; @@ -97,7 +94,9 @@ public class ModifyColumnFamilyProcedure setNextState(ModifyColumnFamilyState.MODIFY_COLUMN_FAMILY_REOPEN_ALL_REGIONS); break; case MODIFY_COLUMN_FAMILY_REOPEN_ALL_REGIONS: - reOpenAllRegionsIfTableIsOnline(env); + if (env.getAssignmentManager().isTableEnabled(getTableName())) { + addChildProcedure(env.getAssignmentManager().createReopenProcedures(getTableName())); + } return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException(this + " unhandled state=" + state); @@ -265,7 +264,8 @@ public class ModifyColumnFamilyProcedure env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor); // Make sure regions are opened after table descriptor is updated. - reOpenAllRegionsIfTableIsOnline(env); + //reOpenAllRegionsIfTableIsOnline(env); + // TODO: NUKE ROLLBACK!!!! } /** @@ -281,26 +281,6 @@ public class ModifyColumnFamilyProcedure } /** - * Last action from the procedure - executed when online schema change is supported. - * @param env MasterProcedureEnv - * @throws IOException - */ - private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException { - // This operation only run when the table is enabled. - if (!env.getMasterServices().getTableStateManager() - .isTableState(getTableName(), TableState.State.ENABLED)) { - return; - } - - List<HRegionInfo> regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName()); - if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), regionInfoList)) { - LOG.info("Completed add column family operation on table " + getTableName()); - } else { - LOG.warn("Error on reopening the regions on table " + getTableName()); - } - } - - /** * The procedure could be restarted from a different machine. If the variable is null, we need to * retrieve it. * @return traceEnabled http://git-wip-us.apache.org/repos/asf/hbase/blob/db1dcf3e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java index 6a70f62..20a6a03 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java @@ -120,7 +120,10 @@ public class ModifyTableProcedure setNextState(ModifyTableState.MODIFY_TABLE_REOPEN_ALL_REGIONS); break; case MODIFY_TABLE_REOPEN_ALL_REGIONS: - reOpenAllRegionsIfTableIsOnline(env); + if (env.getAssignmentManager().isTableEnabled(getTableName())) { + addChildProcedure(env.getAssignmentManager() + .createReopenProcedures(getRegionInfoList(env))); + } return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException("unhandled state=" + state); @@ -299,7 +302,8 @@ public class ModifyTableProcedure deleteFromFs(env, modifiedHTableDescriptor, unmodifiedHTableDescriptor); // Make sure regions are opened after table descriptor is updated. - reOpenAllRegionsIfTableIsOnline(env); + //reOpenAllRegionsIfTableIsOnline(env); + // TODO: NUKE ROLLBACK!!!! } /** @@ -374,25 +378,6 @@ public class ModifyTableProcedure } /** - * Last action from the procedure - executed when online schema change is supported. - * @param env MasterProcedureEnv - * @throws IOException - */ - private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException { - // This operation only run when the table is enabled. - if (!env.getMasterServices().getTableStateManager() - .isTableState(getTableName(), TableState.State.ENABLED)) { - return; - } - - if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) { - LOG.info("Completed modify table operation on table " + getTableName()); - } else { - LOG.warn("Error on reopening the regions on table " + getTableName()); - } - } - - /** * The procedure could be restarted from a different machine. If the variable is null, we need to * retrieve it. * @return traceEnabled whether the trace is enabled @@ -430,7 +415,8 @@ public class ModifyTableProcedure private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException { if (regionInfoList == null) { - regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName()); + regionInfoList = env.getAssignmentManager().getRegionStates() + .getRegionsOfTable(getTableName()); } return regionInfoList; } http://git-wip-us.apache.org/repos/asf/hbase/blob/db1dcf3e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java index 3777c79..21bd6c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java @@ -21,30 +21,26 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import java.io.InterruptedIOException; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoordinatedStateException; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.ProcedureInfo; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.RegionState.State; -import org.apache.hadoop.hbase.master.RegionStates; -import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; /** * Helper to synchronously wait on conditions. @@ -64,19 +60,93 @@ public final class ProcedureSyncWait { T evaluate() throws IOException; } + private static class ProcedureFuture implements Future<byte[]> { + private final ProcedureExecutor<MasterProcedureEnv> procExec; + private final long procId; + + private boolean hasResult = false; + private byte[] result = null; + + public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, long procId) { + this.procExec = procExec; + this.procId = procId; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { return false; } + + @Override + public boolean isCancelled() { return false; } + + @Override + public boolean isDone() { return hasResult; } + + @Override + public byte[] get() throws InterruptedException, ExecutionException { + if (hasResult) return result; + try { + return waitForProcedureToComplete(procExec, procId, Long.MAX_VALUE); + } catch (Exception e) { + throw new ExecutionException(e); + } + } + + @Override + public byte[] get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + if (hasResult) return result; + try { + result = waitForProcedureToComplete(procExec, procId, unit.toMillis(timeout)); + hasResult = true; + return result; + } catch (TimeoutIOException e) { + throw new TimeoutException(e.getMessage()); + } catch (Exception e) { + throw new ExecutionException(e); + } + } + } + + public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec, + final Procedure proc) { + if (proc.isInitializing()) { + procExec.submitProcedure(proc); + } + return new ProcedureFuture(procExec, proc.getProcId()); + } + public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure proc) throws IOException { - long procId = procExec.submitProcedure(proc); - return waitForProcedureToComplete(procExec, procId); + if (proc.isInitializing()) { + procExec.submitProcedure(proc); + } + return waitForProcedureToCompleteIOE(procExec, proc.getProcId(), Long.MAX_VALUE); } - private static byte[] waitForProcedureToComplete(ProcedureExecutor<MasterProcedureEnv> procExec, - final long procId) throws IOException { - while (!procExec.isFinished(procId) && procExec.isRunning()) { - // TODO: add a config to make it tunable - // Dev Consideration: are we waiting forever, or we can set up some timeout value? - Threads.sleepWithoutInterrupt(250); + public static byte[] waitForProcedureToCompleteIOE( + final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final long timeout) + throws IOException { + try { + return waitForProcedureToComplete(procExec, procId, timeout); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); } + } + + public static byte[] waitForProcedureToComplete( + final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final long timeout) + throws IOException { + waitFor(procExec.getEnvironment(), "procId=" + procId, + new ProcedureSyncWait.Predicate<Boolean>() { + @Override + public Boolean evaluate() throws IOException { + return !procExec.isRunning() || procExec.isFinished(procId); + } + } + ); + ProcedureInfo result = procExec.getResult(procId); if (result != null) { if (result.isFailed()) { @@ -104,6 +174,7 @@ public final class ProcedureSyncWait { public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents, String purpose, Predicate<T> predicate) throws IOException { final long done = EnvironmentEdgeManager.currentTime() + waitTime; + boolean logged = false; do { T result = predicate.evaluate(); if (result != null && !result.equals(Boolean.FALSE)) { @@ -115,7 +186,12 @@ public final class ProcedureSyncWait { LOG.warn("Interrupted while sleeping, waiting on " + purpose); throw (InterruptedIOException)new InterruptedIOException().initCause(e); } - LOG.debug("Waiting on " + purpose); + if (LOG.isTraceEnabled()) { + LOG.trace("waitFor " + purpose); + } else { + if (!logged) LOG.debug("waitFor " + purpose); + } + logged = true; } while (EnvironmentEdgeManager.currentTime() < done && env.isRunning()); throw new TimeoutIOException("Timed out while waiting on " + purpose); @@ -133,44 +209,14 @@ public final class ProcedureSyncWait { } } - protected static void waitRegionServers(final MasterProcedureEnv env) throws IOException { - final ServerManager sm = env.getMasterServices().getServerManager(); - ProcedureSyncWait.waitFor(env, "server to assign region(s)", - new ProcedureSyncWait.Predicate<Boolean>() { - @Override - public Boolean evaluate() throws IOException { - List<ServerName> servers = sm.createDestinationServersList(); - return servers != null && !servers.isEmpty(); - } - }); - } - - protected static List<HRegionInfo> getRegionsFromMeta(final MasterProcedureEnv env, - final TableName tableName) throws IOException { - return ProcedureSyncWait.waitFor(env, "regions of table=" + tableName + " from meta", - new ProcedureSyncWait.Predicate<List<HRegionInfo>>() { - @Override - public List<HRegionInfo> evaluate() throws IOException { - if (TableName.META_TABLE_NAME.equals(tableName)) { - return new MetaTableLocator().getMetaRegions(env.getMasterServices().getZooKeeper()); - } - return MetaTableAccessor.getTableRegions(env.getMasterServices().getConnection(),tableName); - } - }); - } - protected static void waitRegionInTransition(final MasterProcedureEnv env, final List<HRegionInfo> regions) throws IOException, CoordinatedStateException { - final AssignmentManager am = env.getMasterServices().getAssignmentManager(); - final RegionStates states = am.getRegionStates(); + final RegionStates states = env.getAssignmentManager().getRegionStates(); for (final HRegionInfo region : regions) { ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition", new ProcedureSyncWait.Predicate<Boolean>() { @Override public Boolean evaluate() throws IOException { - if (states.isRegionInState(region, State.FAILED_OPEN)) { - am.regionOffline(region); - } return !states.isRegionInTransition(region); } }); http://git-wip-us.apache.org/repos/asf/hbase/blob/db1dcf3e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java new file mode 100644 index 0000000..887e272 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -0,0 +1,541 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import com.google.common.collect.ArrayListMultimap; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.ServerListener; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * A remote procecdure dispatcher for regionservers. + */ +public class RSProcedureDispatcher + extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName> + implements ServerListener { + private static final Log LOG = LogFactory.getLog(RSProcedureDispatcher.class); + + public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY = + "hbase.regionserver.rpc.startup.waittime"; + private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000; + + private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0201000; // 2.1 + + protected final MasterServices master; + protected final long rsStartupWaitTime; + + public RSProcedureDispatcher(final MasterServices master) { + super(master.getConfiguration()); + + this.master = master; + this.rsStartupWaitTime = master.getConfiguration().getLong( + RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME); + } + + @Override + public boolean start() { + if (!super.start()) { + return false; + } + + master.getServerManager().registerListener(this); + for (ServerName serverName: master.getServerManager().getOnlineServersList()) { + addNode(serverName); + } + return true; + } + + @Override + public boolean stop() { + if (!super.stop()) { + return false; + } + + master.getServerManager().unregisterListener(this); + return true; + } + + @Override + protected void remoteDispatch(final ServerName serverName, + final Set<RemoteProcedure> operations) { + final int rsVersion = master.getAssignmentManager().getServerVersion(serverName); + if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) { + LOG.info(String.format( + "Using procedure batch rpc execution for serverName=%s version=%s", + serverName, rsVersion)); + submitTask(new ExecuteProceduresRemoteCall(serverName, operations)); + } else { + LOG.info(String.format( + "Fallback to compat rpc execution for serverName=%s version=%s", + serverName, rsVersion)); + submitTask(new CompatRemoteProcedureResolver(serverName, operations)); + } + } + + protected void abortPendingOperations(final ServerName serverName, + final Set<RemoteProcedure> operations) { + // TODO: Replace with a ServerNotOnlineException() + final IOException e = new DoNotRetryIOException("server not online " + serverName); + final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); + for (RemoteProcedure proc: operations) { + proc.remoteCallFailed(env, serverName, e); + } + } + + public void serverAdded(final ServerName serverName) { + addNode(serverName); + } + + public void serverRemoved(final ServerName serverName) { + removeNode(serverName); + } + + /** + * Base remote call + */ + protected abstract class AbstractRSRemoteCall implements Callable<Void> { + private final ServerName serverName; + + private int numberOfAttemptsSoFar = 0; + private long maxWaitTime = -1; + + public AbstractRSRemoteCall(final ServerName serverName) { + this.serverName = serverName; + } + + public abstract Void call(); + + protected AdminService.BlockingInterface getRsAdmin() throws IOException { + final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName); + if (admin == null) { + throw new IOException("Attempting to send OPEN RPC to server " + getServerName() + + " failed because no RPC connection found to this server"); + } + return admin; + } + + protected ServerName getServerName() { + return serverName; + } + + protected boolean scheduleForRetry(final IOException e) { + // Should we wait a little before retrying? If the server is starting it's yes. + final boolean hold = (e instanceof ServerNotRunningYetException); + if (hold) { + LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d", + serverName, numberOfAttemptsSoFar), e); + long now = EnvironmentEdgeManager.currentTime(); + if (now < getMaxWaitTime()) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("server is not yet up; waiting up to %dms", + (getMaxWaitTime() - now)), e); + } + submitTask(this, 100, TimeUnit.MILLISECONDS); + return true; + } + + LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e); + return false; + } + + // In case socket is timed out and the region server is still online, + // the openRegion RPC could have been accepted by the server and + // just the response didn't go through. So we will retry to + // open the region on the same server. + final boolean retry = !hold && (e instanceof SocketTimeoutException + && master.getServerManager().isServerOnline(serverName)); + if (retry) { + // we want to retry as many times as needed as long as the RS is not dead. + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Retrying to same RegionServer %s because: %s", + serverName, e.getMessage()), e); + } + submitTask(this); + return true; + } + + // trying to send the request elsewhere instead + LOG.warn(String.format("the request should be tried elsewhere instead; server=%s try=%d", + serverName, numberOfAttemptsSoFar), e); + return false; + } + + private long getMaxWaitTime() { + if (this.maxWaitTime < 0) { + // This is the max attempts, not retries, so it should be at least 1. + this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime; + } + return this.maxWaitTime; + } + + protected IOException unwrapException(IOException e) { + if (e instanceof RemoteException) { + e = ((RemoteException)e).unwrapRemoteException(); + } + return e; + } + } + + private interface RemoteProcedureResolver { + void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations); + void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations); + } + + public void splitAndResolveOperation(final ServerName serverName, + final Set<RemoteProcedure> operations, final RemoteProcedureResolver resolver) { + final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); + final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType = + buildAndGroupRequestByType(env, serverName, operations); + + final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class); + if (!openOps.isEmpty()) resolver.dispatchOpenRequests(env, openOps); + + final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class); + if (!closeOps.isEmpty()) resolver.dispatchCloseRequests(env, closeOps); + + if (!reqsByType.isEmpty()) { + LOG.warn("unknown request type in the queue: " + reqsByType); + } + } + + // ========================================================================== + // Compatibility calls + // ========================================================================== + protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall + implements RemoteProcedureResolver { + private final Set<RemoteProcedure> operations; + + private ExecuteProceduresRequest.Builder request = null; + + public ExecuteProceduresRemoteCall(final ServerName serverName, + final Set<RemoteProcedure> operations) { + super(serverName); + this.operations = operations; + } + + public Void call() { + final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); + + request = ExecuteProceduresRequest.newBuilder(); + splitAndResolveOperation(getServerName(), operations, this); + + try { + final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build()); + remoteCallCompleted(env, response); + } catch (IOException e) { + e = unwrapException(e); + // TODO: In the future some operation may want to bail out early. + // TODO: How many times should we retry (use numberOfAttemptsSoFar) + if (!scheduleForRetry(e)) { + remoteCallFailed(env, e); + } + } + return null; + } + + public void dispatchOpenRequests(final MasterProcedureEnv env, + final List<RegionOpenOperation> operations) { + request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations)); + } + + public void dispatchCloseRequests(final MasterProcedureEnv env, + final List<RegionCloseOperation> operations) { + for (RegionCloseOperation op: operations) { + request.addCloseRegion(op.buildCloseRegionRequest(getServerName())); + } + } + + protected ExecuteProceduresResponse sendRequest(final ServerName serverName, + final ExecuteProceduresRequest request) throws IOException { + try { + return getRsAdmin().executeProcedures(null, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + + + private void remoteCallCompleted(final MasterProcedureEnv env, + final ExecuteProceduresResponse response) { + /* + for (RemoteProcedure proc: operations) { + proc.remoteCallCompleted(env, getServerName(), response); + }*/ + } + + private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { + for (RemoteProcedure proc: operations) { + proc.remoteCallFailed(env, getServerName(), e); + } + } + } + + // ========================================================================== + // Compatibility calls + // Since we don't have a "batch proc-exec" request on the target RS + // we have to chunk the requests by type and dispatch the specific request. + // ========================================================================== + private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env, + final ServerName serverName, final List<RegionOpenOperation> operations) { + final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder(); + builder.setServerStartCode(serverName.getStartcode()); + builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime()); + for (RegionOpenOperation op: operations) { + builder.addOpenInfo(op.buildRegionOpenInfoRequest(env)); + } + return builder.build(); + } + + private final class OpenRegionRemoteCall extends AbstractRSRemoteCall { + private final List<RegionOpenOperation> operations; + + public OpenRegionRemoteCall(final ServerName serverName, + final List<RegionOpenOperation> operations) { + super(serverName); + this.operations = operations; + } + + @Override + public Void call() { + final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); + final OpenRegionRequest request = buildOpenRegionRequest(env, getServerName(), operations); + + try { + OpenRegionResponse response = sendRequest(getServerName(), request); + remoteCallCompleted(env, response); + } catch (IOException e) { + e = unwrapException(e); + // TODO: In the future some operation may want to bail out early. + // TODO: How many times should we retry (use numberOfAttemptsSoFar) + if (!scheduleForRetry(e)) { + remoteCallFailed(env, e); + } + } + return null; + } + + private OpenRegionResponse sendRequest(final ServerName serverName, + final OpenRegionRequest request) throws IOException { + try { + return getRsAdmin().openRegion(null, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + + private void remoteCallCompleted(final MasterProcedureEnv env, + final OpenRegionResponse response) { + int index = 0; + for (RegionOpenOperation op: operations) { + OpenRegionResponse.RegionOpeningState state = response.getOpeningState(index++); + op.setFailedOpen(state == OpenRegionResponse.RegionOpeningState.FAILED_OPENING); + op.getRemoteProcedure().remoteCallCompleted(env, getServerName(), op); + } + } + + private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { + for (RegionOpenOperation op: operations) { + op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e); + } + } + } + + private final class CloseRegionRemoteCall extends AbstractRSRemoteCall { + private final RegionCloseOperation operation; + + public CloseRegionRemoteCall(final ServerName serverName, + final RegionCloseOperation operation) { + super(serverName); + this.operation = operation; + } + + @Override + public Void call() { + final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); + final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName()); + try { + CloseRegionResponse response = sendRequest(getServerName(), request); + remoteCallCompleted(env, response); + } catch (IOException e) { + e = unwrapException(e); + // TODO: In the future some operation may want to bail out early. + // TODO: How many times should we retry (use numberOfAttemptsSoFar) + if (!scheduleForRetry(e)) { + remoteCallFailed(env, e); + } + } + return null; + } + + private CloseRegionResponse sendRequest(final ServerName serverName, + final CloseRegionRequest request) throws IOException { + try { + return getRsAdmin().closeRegion(null, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + + private void remoteCallCompleted(final MasterProcedureEnv env, + final CloseRegionResponse response) { + operation.setClosed(response.getClosed()); + operation.getRemoteProcedure().remoteCallCompleted(env, getServerName(), operation); + } + + private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { + operation.getRemoteProcedure().remoteCallFailed(env, getServerName(), e); + } + } + + protected class CompatRemoteProcedureResolver implements Callable<Void>, RemoteProcedureResolver { + private final Set<RemoteProcedure> operations; + private final ServerName serverName; + + public CompatRemoteProcedureResolver(final ServerName serverName, + final Set<RemoteProcedure> operations) { + this.serverName = serverName; + this.operations = operations; + } + + @Override + public Void call() { + splitAndResolveOperation(serverName, operations, this); + return null; + } + + public void dispatchOpenRequests(final MasterProcedureEnv env, + final List<RegionOpenOperation> operations) { + submitTask(new OpenRegionRemoteCall(serverName, operations)); + } + + public void dispatchCloseRequests(final MasterProcedureEnv env, + final List<RegionCloseOperation> operations) { + for (RegionCloseOperation op: operations) { + submitTask(new CloseRegionRemoteCall(serverName, op)); + } + } + } + + // ========================================================================== + // RPC Messages + // - ServerOperation: refreshConfig, grant, revoke, ... + // - RegionOperation: open, close, flush, snapshot, ... + // ========================================================================== + public static abstract class ServerOperation extends RemoteOperation { + protected ServerOperation(final RemoteProcedure remoteProcedure) { + super(remoteProcedure); + } + } + + public static abstract class RegionOperation extends RemoteOperation { + private final HRegionInfo regionInfo; + + protected RegionOperation(final RemoteProcedure remoteProcedure, + final HRegionInfo regionInfo) { + super(remoteProcedure); + this.regionInfo = regionInfo; + } + + public HRegionInfo getRegionInfo() { + return this.regionInfo; + } + } + + public static class RegionOpenOperation extends RegionOperation { + private final List<ServerName> favoredNodes; + private final boolean openForReplay; + private boolean failedOpen; + + public RegionOpenOperation(final RemoteProcedure remoteProcedure, + final HRegionInfo regionInfo, final List<ServerName> favoredNodes, + final boolean openForReplay) { + super(remoteProcedure, regionInfo); + this.favoredNodes = favoredNodes; + this.openForReplay = openForReplay; + } + + protected void setFailedOpen(final boolean failedOpen) { + this.failedOpen = failedOpen; + } + + public boolean isFailedOpen() { + return failedOpen; + } + + public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest( + final MasterProcedureEnv env) { + return RequestConverter.buildRegionOpenInfo(getRegionInfo(), + env.getAssignmentManager().getFavoredNodes(getRegionInfo()), false); + } + } + + public static class RegionCloseOperation extends RegionOperation { + private final ServerName destinationServer; + private boolean closed = false; + + public RegionCloseOperation(final RemoteProcedure remoteProcedure, + final HRegionInfo regionInfo, final ServerName destinationServer) { + super(remoteProcedure, regionInfo); + this.destinationServer = destinationServer; + } + + public ServerName getDestinationServer() { + return destinationServer; + } + + protected void setClosed(final boolean closed) { + this.closed = closed; + } + + public boolean isClosed() { + return closed; + } + + public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) { + return ProtobufUtil.buildCloseRegionRequest(serverName, + getRegionInfo().getRegionName(), getDestinationServer()); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/db1dcf3e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java index 21709f8..cfd9df9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MetricsSnapshot; -import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -416,17 +415,7 @@ public class RestoreSnapshotProcedure try { Connection conn = env.getMasterServices().getConnection(); - // 1. Forces all the RegionStates to be offline - // - // The AssignmentManager keeps all the region states around - // with no possibility to remove them, until the master is restarted. - // This means that a region marked as SPLIT before the restore will never be assigned again. - // To avoid having all states around all the regions are switched to the OFFLINE state, - // which is the same state that the regions will be after a delete table. - forceRegionsOffline(env, regionsToAdd); - forceRegionsOffline(env, regionsToRestore); - forceRegionsOffline(env, regionsToRemove); - + // 1. Prepare to restore getMonitorStatus().setStatus("Preparing to restore each region"); // 2. Applies changes to hbase:meta @@ -496,20 +485,6 @@ public class RestoreSnapshotProcedure } /** - * Make sure that region states of the region list is in OFFLINE state. - * @param env MasterProcedureEnv - * @param hris region info list - **/ - private void forceRegionsOffline(final MasterProcedureEnv env, final List<HRegionInfo> hris) { - RegionStates states = env.getMasterServices().getAssignmentManager().getRegionStates(); - if (hris != null) { - for (HRegionInfo hri: hris) { - states.regionOffline(hri); - } - } - } - - /** * The procedure could be restarted from a different machine. If the variable is null, we need to * retrieve it. * @return traceEnabled http://git-wip-us.apache.org/repos/asf/hbase/blob/db1dcf3e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java new file mode 100644 index 0000000..dd1874b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Passed as Exception by {@link ServerCrashProcedure} + * notifying on-going RIT that server has failed. + */ +@InterfaceAudience.Private +@SuppressWarnings("serial") +public class ServerCrashException extends HBaseIOException { + private final long procId; + + /** + * @param server The server that crashed. + */ + public ServerCrashException(long procId) { + this.procId = procId; + } + + @Override + public String getMessage() { + return "Caused by ServerCrashProcedure pid=" + this.procId; + } +} \ No newline at end of file