http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java new file mode 100644 index 0000000..49124ea --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java @@ -0,0 +1,381 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.assignment; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; +import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; + +/** + * Base class for the Assign and Unassign Procedure. + * There can only be one RegionTransitionProcedure per region running at a time + * since each procedure takes a lock on the region (see MasterProcedureScheduler). + * + * <p>This procedure is asynchronous and responds to external events. + * The AssignmentManager will notify this procedure when the RS completes + * the operation and reports the transitioned state + * (see the Assign and Unassign class for more detail). + * <p>Procedures move from the REGION_TRANSITION_QUEUE state when they are + * first submitted, to the REGION_TRANSITION_DISPATCH state when the request + * to remote server is sent and the Procedure is suspended waiting on external + * event to be woken again. Once the external event is triggered, Procedure + * moves to the REGION_TRANSITION_FINISH state. + */ +@InterfaceAudience.Private +public abstract class RegionTransitionProcedure + extends Procedure<MasterProcedureEnv> + implements TableProcedureInterface, + RemoteProcedure<MasterProcedureEnv, ServerName> { + private static final Log LOG = LogFactory.getLog(RegionTransitionProcedure.class); + + protected final AtomicBoolean aborted = new AtomicBoolean(false); + + private RegionTransitionState transitionState = + RegionTransitionState.REGION_TRANSITION_QUEUE; + private HRegionInfo regionInfo; + private volatile boolean lock = false; + + public RegionTransitionProcedure() { + // Required by the Procedure framework to create the procedure on replay + super(); + } + + public RegionTransitionProcedure(final HRegionInfo regionInfo) { + this.regionInfo = regionInfo; + } + + public HRegionInfo getRegionInfo() { + return regionInfo; + } + + protected void setRegionInfo(final HRegionInfo regionInfo) { + // Setter is for deserialization. + this.regionInfo = regionInfo; + } + + @Override + public TableName getTableName() { + HRegionInfo hri = getRegionInfo(); + return hri != null? hri.getTable(): null; + } + + public boolean isMeta() { + return TableName.isMetaTableName(getTableName()); + } + + @Override + public void toStringClassDetails(final StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" table="); + sb.append(getTableName()); + sb.append(", region="); + sb.append(getRegionInfo() == null? null: getRegionInfo().getEncodedName()); + } + + public RegionStateNode getRegionState(final MasterProcedureEnv env) { + return env.getAssignmentManager().getRegionStates(). + getOrCreateRegionNode(getRegionInfo()); + } + + protected void setTransitionState(final RegionTransitionState state) { + this.transitionState = state; + } + + protected RegionTransitionState getTransitionState() { + return transitionState; + } + + protected abstract boolean startTransition(MasterProcedureEnv env, RegionStateNode regionNode) + throws IOException, ProcedureSuspendedException; + + /** + * Called when the Procedure is in the REGION_TRANSITION_DISPATCH state. + * In here we do the RPC call to OPEN/CLOSE the region. The suspending of + * the thread so it sleeps until it gets update that the OPEN/CLOSE has + * succeeded is complicated. Read the implementations to learn more. + */ + protected abstract boolean updateTransition(MasterProcedureEnv env, RegionStateNode regionNode) + throws IOException, ProcedureSuspendedException; + + protected abstract void finishTransition(MasterProcedureEnv env, RegionStateNode regionNode) + throws IOException, ProcedureSuspendedException; + + protected abstract void reportTransition(MasterProcedureEnv env, + RegionStateNode regionNode, TransitionCode code, long seqId) throws UnexpectedStateException; + + public abstract RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName serverName); + protected abstract void remoteCallFailed(MasterProcedureEnv env, + RegionStateNode regionNode, IOException exception); + + @Override + public void remoteCallCompleted(final MasterProcedureEnv env, + final ServerName serverName, final RemoteOperation response) { + // Ignore the response? reportTransition() is the one that count? + } + + @Override + public void remoteCallFailed(final MasterProcedureEnv env, + final ServerName serverName, final IOException exception) { + final RegionStateNode regionNode = getRegionState(env); + assert serverName.equals(regionNode.getRegionLocation()); + String msg = exception.getMessage() == null? exception.getClass().getSimpleName(): + exception.getMessage(); + LOG.warn("Failed " + this + "; " + regionNode.toShortString() + "; exception=" + msg); + remoteCallFailed(env, regionNode, exception); + // NOTE: This call to wakeEvent puts this Procedure back on the scheduler. + // Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond + // this method. Just get out of this current processing quickly. + env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent()); + } + + /** + * Be careful! At the end of this method, the procedure has either succeeded + * and this procedure has been set into a suspended state OR, we failed and + * this procedure has been put back on the scheduler ready for another worker + * to pick it up. In both cases, we need to exit the current Worker processing + * toute de suite! + * @return True if we successfully dispatched the call and false if we failed; + * if failed, we need to roll back any setup done for the dispatch. + */ + protected boolean addToRemoteDispatcher(final MasterProcedureEnv env, + final ServerName targetServer) { + assert targetServer.equals(getRegionState(env).getRegionLocation()) : + "targetServer=" + targetServer + " getRegionLocation=" + + getRegionState(env).getRegionLocation(); // TODO + + LOG.info("Dispatch " + this + "; " + getRegionState(env).toShortString()); + + // Put this procedure into suspended mode to wait on report of state change + // from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'. + env.getProcedureScheduler().suspendEvent(getRegionState(env).getProcedureEvent()); + + // Tricky because this can fail. If it fails need to backtrack on stuff like + // the 'suspend' done above -- tricky as the 'wake' requeues us -- and ditto + // up in the caller; it needs to undo state changes. + if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) { + remoteCallFailed(env, targetServer, + new FailedRemoteDispatchException(this + " to " + targetServer)); + return false; + } + return true; + } + + protected void reportTransition(final MasterProcedureEnv env, final ServerName serverName, + final TransitionCode code, final long seqId) throws UnexpectedStateException { + final RegionStateNode regionNode = getRegionState(env); + if (LOG.isDebugEnabled()) { + LOG.debug("Received report " + code + " seqId=" + seqId + ", " + + this + "; " + regionNode.toShortString()); + } + if (!serverName.equals(regionNode.getRegionLocation())) { + if (isMeta() && regionNode.getRegionLocation() == null) { + regionNode.setRegionLocation(serverName); + } else { + throw new UnexpectedStateException(String.format( + "Unexpected state=%s from server=%s; expected server=%s; %s; %s", + code, serverName, regionNode.getRegionLocation(), + this, regionNode.toShortString())); + } + } + + reportTransition(env, regionNode, code, seqId); + + // NOTE: This call adds this procedure back on the scheduler. + // This makes it so this procedure can run again. Another worker will take + // processing to the next stage. At an extreme, the other worker may run in + // parallel so DO NOT CHANGE any state hereafter! This should be last thing + // done in this processing step. + env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent()); + } + + protected boolean isServerOnline(final MasterProcedureEnv env, final RegionStateNode regionNode) { + return isServerOnline(env, regionNode.getRegionLocation()); + } + + protected boolean isServerOnline(final MasterProcedureEnv env, final ServerName serverName) { + return env.getMasterServices().getServerManager().isServerOnline(serverName); + } + + @Override + protected void toStringState(StringBuilder builder) { + super.toStringState(builder); + RegionTransitionState ts = this.transitionState; + if (!isFinished() && ts != null) { + builder.append(":").append(ts); + } + } + + @Override + protected Procedure[] execute(final MasterProcedureEnv env) throws ProcedureSuspendedException { + final AssignmentManager am = env.getAssignmentManager(); + final RegionStateNode regionNode = getRegionState(env); + if (!am.addRegionInTransition(regionNode, this)) { + String msg = String.format( + "There is already another procedure running on this region this=%s owner=%s", + this, regionNode.getProcedure()); + LOG.warn(msg + " " + this + "; " + regionNode.toShortString()); + setAbortFailure(getClass().getSimpleName(), msg); + return null; + } + try { + boolean retry; + do { + retry = false; + switch (transitionState) { + case REGION_TRANSITION_QUEUE: + // 1. push into the AM queue for balancer policy + if (!startTransition(env, regionNode)) { + // The operation figured it is done or it aborted; check getException() + am.removeRegionInTransition(getRegionState(env), this); + return null; + } + transitionState = RegionTransitionState.REGION_TRANSITION_DISPATCH; + if (env.getProcedureScheduler().waitEvent(regionNode.getProcedureEvent(), this)) { + // Why this suspend? Because we want to ensure Store happens before proceed? + throw new ProcedureSuspendedException(); + } + break; + + case REGION_TRANSITION_DISPATCH: + // 2. send the request to the target server + if (!updateTransition(env, regionNode)) { + // The operation figured it is done or it aborted; check getException() + am.removeRegionInTransition(regionNode, this); + return null; + } + if (transitionState != RegionTransitionState.REGION_TRANSITION_DISPATCH) { + retry = true; + break; + } + if (env.getProcedureScheduler().waitEvent(regionNode.getProcedureEvent(), this)) { + throw new ProcedureSuspendedException(); + } + break; + + case REGION_TRANSITION_FINISH: + // 3. wait assignment response. completion/failure + finishTransition(env, regionNode); + am.removeRegionInTransition(regionNode, this); + return null; + } + } while (retry); + } catch (IOException e) { + LOG.warn("Retryable error trying to transition: " + + this + "; " + regionNode.toShortString(), e); + } + + return new Procedure[] {this}; + } + + @Override + protected void rollback(final MasterProcedureEnv env) { + if (isRollbackSupported(transitionState)) { + // Nothing done up to this point. abort safely. + // This should happen when something like disableTable() is triggered. + env.getAssignmentManager().removeRegionInTransition(getRegionState(env), this); + return; + } + + // There is no rollback for assignment unless we cancel the operation by + // dropping/disabling the table. + throw new UnsupportedOperationException("Unhandled state " + transitionState + + "; there is no rollback for assignment unless we cancel the operation by " + + "dropping/disabling the table"); + } + + protected abstract boolean isRollbackSupported(final RegionTransitionState state); + + @Override + protected boolean abort(final MasterProcedureEnv env) { + if (isRollbackSupported(transitionState)) { + aborted.set(true); + return true; + } + return false; + } + + @Override + protected LockState acquireLock(final MasterProcedureEnv env) { + // Unless we are assigning meta, wait for meta to be available and loaded. + if (!isMeta() && (env.waitFailoverCleanup(this) || + env.getAssignmentManager().waitMetaInitialized(this, getRegionInfo()))) { + return LockState.LOCK_EVENT_WAIT; + } + + // TODO: Revisit this and move it to the executor + if (env.getProcedureScheduler().waitRegion(this, getRegionInfo())) { + try { + LOG.debug(LockState.LOCK_EVENT_WAIT + " pid=" + getProcId() + " " + + env.getProcedureScheduler().dumpLocks()); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return LockState.LOCK_EVENT_WAIT; + } + this.lock = true; + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + env.getProcedureScheduler().wakeRegion(this, getRegionInfo()); + lock = false; + } + + @Override + protected boolean holdLock(final MasterProcedureEnv env) { + return true; + } + + @Override + protected boolean hasLock(final MasterProcedureEnv env) { + return lock; + } + + @Override + protected boolean shouldWaitClientAck(MasterProcedureEnv env) { + // The operation is triggered internally on the server + // the client does not know about this procedure. + return false; + } + + /** + * Used by ServerCrashProcedure to see if this Assign/Unassign needs processing. + * @return ServerName the Assign or Unassign is going against. + */ + public abstract ServerName getServer(final MasterProcedureEnv env); +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java new file mode 100644 index 0000000..a893783 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java @@ -0,0 +1,733 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.assignment; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MasterSwitchType; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; +import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SplitTableRegionState; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; + +import com.google.common.annotations.VisibleForTesting; + +/** + * The procedure to split a region in a table. + * Takes lock on the parent region. + * It holds the lock for the life of the procedure. + */ +@InterfaceAudience.Private +public class SplitTableRegionProcedure + extends AbstractStateMachineRegionProcedure<SplitTableRegionState> { + private static final Log LOG = LogFactory.getLog(SplitTableRegionProcedure.class); + private Boolean traceEnabled = null; + private HRegionInfo daughter_1_HRI; + private HRegionInfo daughter_2_HRI; + + public SplitTableRegionProcedure() { + // Required by the Procedure framework to create the procedure on replay + } + + public SplitTableRegionProcedure(final MasterProcedureEnv env, + final HRegionInfo regionToSplit, final byte[] splitRow) throws IOException { + super(env, regionToSplit); + + checkSplitRow(regionToSplit, splitRow); + + final TableName table = regionToSplit.getTable(); + final long rid = getDaughterRegionIdTimestamp(regionToSplit); + this.daughter_1_HRI = new HRegionInfo(table, regionToSplit.getStartKey(), splitRow, false, rid); + this.daughter_2_HRI = new HRegionInfo(table, splitRow, regionToSplit.getEndKey(), false, rid); + } + + private static void checkSplitRow(final HRegionInfo regionToSplit, final byte[] splitRow) + throws IOException { + if (splitRow == null || splitRow.length == 0) { + throw new DoNotRetryIOException("Split row cannot be null"); + } + + if (Bytes.equals(regionToSplit.getStartKey(), splitRow)) { + throw new DoNotRetryIOException( + "Split row is equal to startkey: " + Bytes.toStringBinary(splitRow)); + } + + if (!regionToSplit.containsRow(splitRow)) { + throw new DoNotRetryIOException( + "Split row is not inside region key range splitKey:" + Bytes.toStringBinary(splitRow) + + " region: " + regionToSplit); + } + } + + /** + * Calculate daughter regionid to use. + * @param hri Parent {@link HRegionInfo} + * @return Daughter region id (timestamp) to use. + */ + private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) { + long rid = EnvironmentEdgeManager.currentTime(); + // Regionid is timestamp. Can't be less than that of parent else will insert + // at wrong location in hbase:meta (See HBASE-710). + if (rid < hri.getRegionId()) { + LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() + + " but current time here is " + rid); + rid = hri.getRegionId() + 1; + } + return rid; + } + + @Override + protected Flow executeFromState(final MasterProcedureEnv env, final SplitTableRegionState state) + throws InterruptedException { + if (isTraceEnabled()) { + LOG.trace(this + " execute state=" + state); + } + + try { + switch (state) { + case SPLIT_TABLE_REGION_PREPARE: + if (prepareSplitRegion(env)) { + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION); + break; + } else { + assert isFailed() : "split region should have an exception here"; + return Flow.NO_MORE_STATE; + } + case SPLIT_TABLE_REGION_PRE_OPERATION: + preSplitRegion(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CLOSE_PARENT_REGION); + break; + case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION: + addChildProcedure(createUnassignProcedures(env, getRegionReplication(env))); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS); + break; + case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS: + createDaughterRegions(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR); + break; + case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR: + preSplitRegionBeforePONR(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META); + break; + case SPLIT_TABLE_REGION_UPDATE_META: + updateMetaForDaughterRegions(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR); + break; + case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR: + preSplitRegionAfterPONR(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS); + break; + case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS: + addChildProcedure(createAssignProcedures(env, getRegionReplication(env))); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_POST_OPERATION); + break; + case SPLIT_TABLE_REGION_POST_OPERATION: + postSplitRegion(env); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); + } + } catch (IOException e) { + String msg = "Error trying to split region " + getParentRegion().getEncodedName() + " in the table " + + getTableName() + " (in state=" + state + ")"; + if (!isRollbackSupported(state)) { + // We reach a state that cannot be rolled back. We just need to keep retry. + LOG.warn(msg, e); + } else { + LOG.error(msg, e); + setFailure(e); + } + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState(final MasterProcedureEnv env, final SplitTableRegionState state) + throws IOException, InterruptedException { + if (isTraceEnabled()) { + LOG.trace(this + " rollback state=" + state); + } + + try { + switch (state) { + case SPLIT_TABLE_REGION_POST_OPERATION: + case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS: + case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR: + case SPLIT_TABLE_REGION_UPDATE_META: + // PONR + throw new UnsupportedOperationException(this + " unhandled state=" + state); + case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR: + break; + case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS: + // Doing nothing, as re-open parent region would clean up daughter region directories. + break; + case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION: + openParentRegion(env); + break; + case SPLIT_TABLE_REGION_PRE_OPERATION: + postRollBackSplitRegion(env); + break; + case SPLIT_TABLE_REGION_PREPARE: + break; // nothing to do + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); + } + } catch (IOException e) { + // This will be retried. Unless there is a bug in the code, + // this should be just a "temporary error" (e.g. network down) + LOG.warn("pid=" + getProcId() + " failed rollback attempt step " + state + + " for splitting the region " + + getParentRegion().getEncodedName() + " in table " + getTableName(), e); + throw e; + } + } + + /* + * Check whether we are in the state that can be rollback + */ + @Override + protected boolean isRollbackSupported(final SplitTableRegionState state) { + switch (state) { + case SPLIT_TABLE_REGION_POST_OPERATION: + case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS: + case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR: + case SPLIT_TABLE_REGION_UPDATE_META: + // It is not safe to rollback if we reach to these states. + return false; + default: + break; + } + return true; + } + + @Override + protected SplitTableRegionState getState(final int stateId) { + return SplitTableRegionState.forNumber(stateId); + } + + @Override + protected int getStateId(final SplitTableRegionState state) { + return state.getNumber(); + } + + @Override + protected SplitTableRegionState getInitialState() { + return SplitTableRegionState.SPLIT_TABLE_REGION_PREPARE; + } + + @Override + public void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + + final MasterProcedureProtos.SplitTableRegionStateData.Builder splitTableRegionMsg = + MasterProcedureProtos.SplitTableRegionStateData.newBuilder() + .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser())) + .setParentRegionInfo(HRegionInfo.convert(getRegion())) + .addChildRegionInfo(HRegionInfo.convert(daughter_1_HRI)) + .addChildRegionInfo(HRegionInfo.convert(daughter_2_HRI)); + splitTableRegionMsg.build().writeDelimitedTo(stream); + } + + @Override + public void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + + final MasterProcedureProtos.SplitTableRegionStateData splitTableRegionsMsg = + MasterProcedureProtos.SplitTableRegionStateData.parseDelimitedFrom(stream); + setUser(MasterProcedureUtil.toUserInfo(splitTableRegionsMsg.getUserInfo())); + setRegion(HRegionInfo.convert(splitTableRegionsMsg.getParentRegionInfo())); + assert(splitTableRegionsMsg.getChildRegionInfoCount() == 2); + daughter_1_HRI = HRegionInfo.convert(splitTableRegionsMsg.getChildRegionInfo(0)); + daughter_2_HRI = HRegionInfo.convert(splitTableRegionsMsg.getChildRegionInfo(1)); + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" table="); + sb.append(getTableName()); + sb.append(", parent="); + sb.append(getParentRegion().getShortNameToLog()); + sb.append(", daughterA="); + sb.append(daughter_1_HRI.getShortNameToLog()); + sb.append(", daughterB="); + sb.append(daughter_2_HRI.getShortNameToLog()); + } + + private HRegionInfo getParentRegion() { + return getRegion(); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.REGION_SPLIT; + } + + private byte[] getSplitRow() { + return daughter_2_HRI.getStartKey(); + } + + private static State [] EXPECTED_SPLIT_STATES = new State [] {State.OPEN, State.CLOSED}; + /** + * Prepare to Split region. + * @param env MasterProcedureEnv + * @throws IOException + */ + @VisibleForTesting + public boolean prepareSplitRegion(final MasterProcedureEnv env) throws IOException { + // Check whether the region is splittable + RegionStateNode node = env.getAssignmentManager().getRegionStates().getRegionNode(getParentRegion()); + HRegionInfo parentHRI = null; + if (node != null) { + parentHRI = node.getRegionInfo(); + + // Lookup the parent HRI state from the AM, which has the latest updated info. + // Protect against the case where concurrent SPLIT requests came in and succeeded + // just before us. + if (node.isInState(State.SPLIT)) { + LOG.info("Split of " + parentHRI + " skipped; state is already SPLIT"); + return false; + } + if (parentHRI.isSplit() || parentHRI.isOffline()) { + LOG.info("Split of " + parentHRI + " skipped because offline/split."); + return false; + } + + // expected parent to be online or closed + if (!node.isInState(EXPECTED_SPLIT_STATES)) { + // We may have SPLIT already? + setFailure(new IOException("Split " + parentHRI.getRegionNameAsString() + + " FAILED because state=" + node.getState() + "; expected " + + Arrays.toString(EXPECTED_SPLIT_STATES))); + return false; + } + + // Ask the remote regionserver if this region is splittable. If we get an IOE, report it + // along w/ the failure so can see why we are not splittable at this time. + IOException splittableCheckIOE = null; + boolean splittable = false; + try { + GetRegionInfoResponse response = + Util.getRegionInfoResponse(env, node.getRegionLocation(), node.getRegionInfo()); + splittable = response.hasSplittable() && response.getSplittable(); + if (LOG.isDebugEnabled()) { + LOG.debug("Splittable=" + splittable + " " + this + " " + node.toShortString()); + } + } catch (IOException e) { + splittableCheckIOE = e; + } + if (!splittable) { + IOException e = new IOException(parentHRI.getShortNameToLog() + " NOT splittable"); + if (splittableCheckIOE != null) e.initCause(splittableCheckIOE); + setFailure(e); + return false; + } + } + + // Since we have the lock and the master is coordinating the operation + // we are always able to split the region + if (!env.getMasterServices().isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) { + LOG.warn("pid=" + getProcId() + " split switch is off! skip split of " + parentHRI); + setFailure(new IOException("Split region " + + (parentHRI == null? "null": parentHRI.getRegionNameAsString()) + + " failed due to split switch off")); + return false; + } + return true; + } + + /** + * Action before splitting region in a table. + * @param env MasterProcedureEnv + * @param state the procedure state + * @throws IOException + * @throws InterruptedException + */ + private void preSplitRegion(final MasterProcedureEnv env) + throws IOException, InterruptedException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preSplitRegionAction(getTableName(), getSplitRow(), getUser()); + } + } + + /** + * Action after rollback a split table region action. + * @param env MasterProcedureEnv + * @throws IOException + */ + private void postRollBackSplitRegion(final MasterProcedureEnv env) throws IOException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postRollBackSplitRegionAction(getUser()); + } + } + + /** + * Rollback close parent region + * @param env MasterProcedureEnv + **/ + private void openParentRegion(final MasterProcedureEnv env) throws IOException { + // Check whether the region is closed; if so, open it in the same server + final int regionReplication = getRegionReplication(env); + final ServerName serverName = getParentRegionServerName(env); + + final AssignProcedure[] procs = new AssignProcedure[regionReplication]; + for (int i = 0; i < regionReplication; ++i) { + final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(getParentRegion(), i); + procs[i] = env.getAssignmentManager().createAssignProcedure(hri, serverName); + } + env.getMasterServices().getMasterProcedureExecutor().submitProcedures(procs); + } + + /** + * Create daughter regions + * @param env MasterProcedureEnv + * @throws IOException + */ + @VisibleForTesting + public void createDaughterRegions(final MasterProcedureEnv env) throws IOException { + final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), getTableName()); + final FileSystem fs = mfs.getFileSystem(); + HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( + env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false); + regionFs.createSplitsDir(); + + Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs); + + assertReferenceFileCount(fs, expectedReferences.getFirst(), + regionFs.getSplitsDir(daughter_1_HRI)); + //Move the files from the temporary .splits to the final /table/region directory + regionFs.commitDaughterRegion(daughter_1_HRI); + assertReferenceFileCount(fs, expectedReferences.getFirst(), + new Path(tabledir, daughter_1_HRI.getEncodedName())); + + assertReferenceFileCount(fs, expectedReferences.getSecond(), + regionFs.getSplitsDir(daughter_2_HRI)); + regionFs.commitDaughterRegion(daughter_2_HRI); + assertReferenceFileCount(fs, expectedReferences.getSecond(), + new Path(tabledir, daughter_2_HRI.getEncodedName())); + } + + /** + * Create Split directory + * @param env MasterProcedureEnv + * @throws IOException + */ + private Pair<Integer, Integer> splitStoreFiles( + final MasterProcedureEnv env, + final HRegionFileSystem regionFs) throws IOException { + final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + final Configuration conf = env.getMasterConfiguration(); + + // The following code sets up a thread pool executor with as many slots as + // there's files to split. It then fires up everything, waits for + // completion and finally checks for any exception + // + // Note: splitStoreFiles creates daughter region dirs under the parent splits dir + // Nothing to unroll here if failure -- re-run createSplitsDir will + // clean this up. + int nbFiles = 0; + for (String family: regionFs.getFamilies()) { + final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family); + if (storeFiles != null) { + nbFiles += storeFiles.size(); + } + } + if (nbFiles == 0) { + // no file needs to be splitted. + return new Pair<Integer, Integer>(0,0); + } + // Max #threads is the smaller of the number of storefiles or the default max determined above. + int maxThreads = Math.min( + conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX, + conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT)), + nbFiles); + LOG.info("pid=" + getProcId() + " preparing to split " + nbFiles + " storefiles for region " + + getParentRegion().getShortNameToLog() + " using " + maxThreads + " threads"); + final ExecutorService threadPool = Executors.newFixedThreadPool( + maxThreads, Threads.getNamedThreadFactory("StoreFileSplitter-%1$d")); + final List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>>(nbFiles); + + // Split each store file. + final TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName()); + for (String family: regionFs.getFamilies()) { + final HColumnDescriptor hcd = htd.getFamily(family.getBytes()); + final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family); + if (storeFiles != null && storeFiles.size() > 0) { + final CacheConfig cacheConf = new CacheConfig(conf, hcd); + for (StoreFileInfo storeFileInfo: storeFiles) { + StoreFileSplitter sfs = new StoreFileSplitter( + regionFs, + family.getBytes(), + new StoreFile( + mfs.getFileSystem(), storeFileInfo, conf, cacheConf, hcd.getBloomFilterType())); + futures.add(threadPool.submit(sfs)); + } + } + } + // Shutdown the pool + threadPool.shutdown(); + + // Wait for all the tasks to finish + long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout", 30000); + try { + boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, TimeUnit.MILLISECONDS); + if (stillRunning) { + threadPool.shutdownNow(); + // wait for the thread to shutdown completely. + while (!threadPool.isTerminated()) { + Thread.sleep(50); + } + throw new IOException("Took too long to split the" + + " files and create the references, aborting split"); + } + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + + int daughterA = 0; + int daughterB = 0; + // Look for any exception + for (Future<Pair<Path, Path>> future : futures) { + try { + Pair<Path, Path> p = future.get(); + daughterA += p.getFirst() != null ? 1 : 0; + daughterB += p.getSecond() != null ? 1 : 0; + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } catch (ExecutionException e) { + throw new IOException(e); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("pid=" + getProcId() + " split storefiles for region " + getParentRegion().getShortNameToLog() + + " Daughter A: " + daughterA + " storefiles, Daughter B: " + daughterB + " storefiles."); + } + return new Pair<Integer, Integer>(daughterA, daughterB); + } + + private void assertReferenceFileCount(final FileSystem fs, final int expectedReferenceFileCount, + final Path dir) throws IOException { + if (expectedReferenceFileCount != 0 && + expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(fs, dir)) { + throw new IOException("Failing split. Expected reference file count isn't equal."); + } + } + + private Pair<Path, Path> splitStoreFile(final HRegionFileSystem regionFs, + final byte[] family, final StoreFile sf) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("pid=" + getProcId() + " splitting started for store file: " + + sf.getPath() + " for region: " + getParentRegion()); + } + + final byte[] splitRow = getSplitRow(); + final String familyName = Bytes.toString(family); + final Path path_first = + regionFs.splitStoreFile(this.daughter_1_HRI, familyName, sf, splitRow, false, null); + final Path path_second = + regionFs.splitStoreFile(this.daughter_2_HRI, familyName, sf, splitRow, true, null); + if (LOG.isDebugEnabled()) { + LOG.debug("pid=" + getProcId() + " splitting complete for store file: " + + sf.getPath() + " for region: " + getParentRegion().getShortNameToLog()); + } + return new Pair<Path,Path>(path_first, path_second); + } + + /** + * Utility class used to do the file splitting / reference writing + * in parallel instead of sequentially. + */ + private class StoreFileSplitter implements Callable<Pair<Path,Path>> { + private final HRegionFileSystem regionFs; + private final byte[] family; + private final StoreFile sf; + + /** + * Constructor that takes what it needs to split + * @param regionFs the file system + * @param family Family that contains the store file + * @param sf which file + */ + public StoreFileSplitter(final HRegionFileSystem regionFs, final byte[] family, + final StoreFile sf) { + this.regionFs = regionFs; + this.sf = sf; + this.family = family; + } + + public Pair<Path,Path> call() throws IOException { + return splitStoreFile(regionFs, family, sf); + } + } + + /** + * Post split region actions before the Point-of-No-Return step + * @param env MasterProcedureEnv + **/ + private void preSplitRegionBeforePONR(final MasterProcedureEnv env) + throws IOException, InterruptedException { + final List<Mutation> metaEntries = new ArrayList<Mutation>(); + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + if (cpHost.preSplitBeforePONRAction(getSplitRow(), metaEntries, getUser())) { + throw new IOException("Coprocessor bypassing region " + + getParentRegion().getRegionNameAsString() + " split."); + } + try { + for (Mutation p : metaEntries) { + HRegionInfo.parseRegionName(p.getRow()); + } + } catch (IOException e) { + LOG.error("pid=" + getProcId() + " row key of mutation from coprocessor not parsable as " + + "region name." + + "Mutations from coprocessor should only for hbase:meta table."); + throw e; + } + } + } + + /** + * Add daughter regions to META + * @param env MasterProcedureEnv + * @throws IOException + */ + private void updateMetaForDaughterRegions(final MasterProcedureEnv env) throws IOException { + env.getAssignmentManager().markRegionAsSplit(getParentRegion(), getParentRegionServerName(env), + daughter_1_HRI, daughter_2_HRI); + } + + /** + * Pre split region actions after the Point-of-No-Return step + * @param env MasterProcedureEnv + **/ + private void preSplitRegionAfterPONR(final MasterProcedureEnv env) + throws IOException, InterruptedException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preSplitAfterPONRAction(getUser()); + } + } + + /** + * Post split region actions + * @param env MasterProcedureEnv + **/ + private void postSplitRegion(final MasterProcedureEnv env) throws IOException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postCompletedSplitRegionAction(daughter_1_HRI, daughter_2_HRI, getUser()); + } + } + + private ServerName getParentRegionServerName(final MasterProcedureEnv env) { + return env.getMasterServices().getAssignmentManager() + .getRegionStates().getRegionServerOfRegion(getParentRegion()); + } + + private UnassignProcedure[] createUnassignProcedures(final MasterProcedureEnv env, + final int regionReplication) { + final UnassignProcedure[] procs = new UnassignProcedure[regionReplication]; + for (int i = 0; i < procs.length; ++i) { + final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(getParentRegion(), i); + procs[i] = env.getAssignmentManager().createUnassignProcedure(hri, null, true); + } + return procs; + } + + private AssignProcedure[] createAssignProcedures(final MasterProcedureEnv env, + final int regionReplication) { + final ServerName targetServer = getParentRegionServerName(env); + final AssignProcedure[] procs = new AssignProcedure[regionReplication * 2]; + int procsIdx = 0; + for (int i = 0; i < regionReplication; ++i) { + final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(daughter_1_HRI, i); + procs[procsIdx++] = env.getAssignmentManager().createAssignProcedure(hri, targetServer); + } + for (int i = 0; i < regionReplication; ++i) { + final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(daughter_2_HRI, i); + procs[procsIdx++] = env.getAssignmentManager().createAssignProcedure(hri, targetServer); + } + return procs; + } + + private int getRegionReplication(final MasterProcedureEnv env) throws IOException { + final TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName()); + return htd.getRegionReplication(); + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @return traceEnabled + */ + private boolean isTraceEnabled() { + if (traceEnabled == null) { + traceEnabled = LOG.isTraceEnabled(); + } + return traceEnabled; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java new file mode 100644 index 0000000..75a8e7f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java @@ -0,0 +1,261 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.assignment; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ServerCrashException; +import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UnassignRegionStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; + + +/** + * Procedure that describe the unassignment of a single region. + * There can only be one RegionTransitionProcedure per region running at the time, + * since each procedure takes a lock on the region. + * + * <p>The Unassign starts by placing a "close region" request in the Remote Dispatcher + * queue, and the procedure will then go into a "waiting state". + * The Remote Dispatcher will batch the various requests for that server and + * they will be sent to the RS for execution. + * The RS will complete the open operation by calling master.reportRegionStateTransition(). + * The AM will intercept the transition report, and notify the procedure. + * The procedure will finish the unassign by publishing its new state on meta + * or it will retry the unassign. + */ +@InterfaceAudience.Private +public class UnassignProcedure extends RegionTransitionProcedure { + private static final Log LOG = LogFactory.getLog(UnassignProcedure.class); + + /** + * Where to send the unassign RPC. + */ + protected volatile ServerName hostingServer; + /** + * The Server we will subsequently assign the region too (can be null). + */ + protected volatile ServerName destinationServer; + + protected final AtomicBoolean serverCrashed = new AtomicBoolean(false); + + // TODO: should this be in a reassign procedure? + // ...and keep unassign for 'disable' case? + private boolean force; + + public UnassignProcedure() { + // Required by the Procedure framework to create the procedure on replay + super(); + } + + public UnassignProcedure(final HRegionInfo regionInfo, final ServerName hostingServer, + final boolean force) { + this(regionInfo, hostingServer, null, force); + } + + public UnassignProcedure(final HRegionInfo regionInfo, + final ServerName hostingServer, final ServerName destinationServer, final boolean force) { + super(regionInfo); + this.hostingServer = hostingServer; + this.destinationServer = destinationServer; + this.force = force; + + // we don't need REGION_TRANSITION_QUEUE, we jump directly to sending the request + setTransitionState(RegionTransitionState.REGION_TRANSITION_DISPATCH); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.REGION_UNASSIGN; + } + + @Override + protected boolean isRollbackSupported(final RegionTransitionState state) { + switch (state) { + case REGION_TRANSITION_QUEUE: + case REGION_TRANSITION_DISPATCH: + return true; + default: + return false; + } + } + + @Override + public void serializeStateData(final OutputStream stream) throws IOException { + UnassignRegionStateData.Builder state = UnassignRegionStateData.newBuilder() + .setTransitionState(getTransitionState()) + .setHostingServer(ProtobufUtil.toServerName(this.hostingServer)) + .setRegionInfo(HRegionInfo.convert(getRegionInfo())); + if (this.destinationServer != null) { + state.setDestinationServer(ProtobufUtil.toServerName(destinationServer)); + } + if (force) { + state.setForce(true); + } + state.build().writeDelimitedTo(stream); + } + + @Override + public void deserializeStateData(final InputStream stream) throws IOException { + final UnassignRegionStateData state = UnassignRegionStateData.parseDelimitedFrom(stream); + setTransitionState(state.getTransitionState()); + setRegionInfo(HRegionInfo.convert(state.getRegionInfo())); + this.hostingServer = ProtobufUtil.toServerName(state.getHostingServer()); + force = state.getForce(); + if (state.hasDestinationServer()) { + this.destinationServer = ProtobufUtil.toServerName(state.getDestinationServer()); + } + } + + @Override + protected boolean startTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) { + // nothing to do here. we skip the step in the constructor + // by jumping to REGION_TRANSITION_DISPATCH + throw new UnsupportedOperationException(); + } + + @Override + protected boolean updateTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) + throws IOException { + // if the region is already closed or offline we can't do much... + if (regionNode.isInState(State.CLOSED, State.OFFLINE)) { + LOG.info("Not unassigned " + this + "; " + regionNode.toShortString()); + return false; + } + + // if the server is down, mark the operation as complete + if (serverCrashed.get() || !isServerOnline(env, regionNode)) { + LOG.info("Server already down: " + this + "; " + regionNode.toShortString()); + return false; + } + + // if we haven't started the operation yet, we can abort + if (aborted.get() && regionNode.isInState(State.OPEN)) { + setAbortFailure(getClass().getSimpleName(), "abort requested"); + return false; + } + + // Mark the region as CLOSING. + env.getAssignmentManager().markRegionAsClosing(regionNode); + + // Add the close region operation the the server dispatch queue. + if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) { + // If addToRemoteDispatcher fails, it calls #remoteCallFailed which + // does all cleanup. + } + + // We always return true, even if we fail dispatch because addToRemoteDispatcher + // failure processing sets state back to REGION_TRANSITION_QUEUE so we try again; + // i.e. return true to keep the Procedure running; it has been reset to startover. + return true; + } + + @Override + protected void finishTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) + throws IOException { + env.getAssignmentManager().markRegionAsClosed(regionNode); + } + + @Override + public RemoteOperation remoteCallBuild(final MasterProcedureEnv env, final ServerName serverName) { + assert serverName.equals(getRegionState(env).getRegionLocation()); + return new RegionCloseOperation(this, getRegionInfo(), this.destinationServer); + } + + @Override + protected void reportTransition(final MasterProcedureEnv env, final RegionStateNode regionNode, + final TransitionCode code, final long seqId) throws UnexpectedStateException { + switch (code) { + case CLOSED: + setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH); + break; + default: + throw new UnexpectedStateException(String.format( + "Received report unexpected transition state=%s for region=%s server=%s, expected CLOSED.", + code, regionNode.getRegionInfo(), regionNode.getRegionLocation())); + } + } + + @Override + protected void remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode, + final IOException exception) { + // TODO: Is there on-going rpc to cleanup? + if (exception instanceof ServerCrashException) { + // This exception comes from ServerCrashProcedure after log splitting. + // It is ok to let this procedure go on to complete close now. + // This will release lock on this region so the subsequent assign can succeed. + try { + reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM); + } catch (UnexpectedStateException e) { + // Should never happen. + throw new RuntimeException(e); + } + } else if (exception instanceof RegionServerAbortedException || + exception instanceof RegionServerStoppedException || + exception instanceof ServerNotRunningYetException) { + // TODO + // RS is aborting, we cannot offline the region since the region may need to do WAL + // recovery. Until we see the RS expiration, we should retry. + LOG.info("Ignoring; waiting on ServerCrashProcedure", exception); + // serverCrashed.set(true); + } else if (exception instanceof NotServingRegionException) { + LOG.info("IS THIS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD " + regionNode, exception); + setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH); + } else { + // TODO: kill the server in case we get an exception we are not able to handle + LOG.warn("Killing server; unexpected exception; " + + this + "; " + regionNode.toShortString() + + " exception=" + exception); + env.getMasterServices().getServerManager().expireServer(regionNode.getRegionLocation()); + serverCrashed.set(true); + } + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + super.toStringClassDetails(sb); + sb.append(", server=").append(this.hostingServer); + } + + @Override + public ServerName getServer(final MasterProcedureEnv env) { + return this.hostingServer; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/Util.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/Util.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/Util.java new file mode 100644 index 0000000..cb3861a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/Util.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.assignment; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; + +/** + * Utility for this assignment package only. + */ +@InterfaceAudience.Private +class Util { + private Util() {} + + /** + * Raw call to remote regionserver to get info on a particular region. + * @throws IOException Let it out so can report this IOE as reason for failure + */ + static GetRegionInfoResponse getRegionInfoResponse(final MasterProcedureEnv env, + final ServerName regionLocation, final HRegionInfo hri) + throws IOException { + // TODO: There is no timeout on this controller. Set one! + HBaseRpcController controller = env.getMasterServices().getClusterConnection(). + getRpcControllerFactory().newController(); + final AdminService.BlockingInterface admin = + env.getMasterServices().getClusterConnection().getAdmin(regionLocation); + GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(hri.getRegionName()); + try { + return admin.getRegionInfo(controller, request); + } catch (ServiceException e) { + throw ProtobufUtil.handleRemoteException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 6410375..a494ecc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -1,4 +1,4 @@ -/** + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -62,9 +62,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; /** - * The base class for load balancers. It provides functions used by - * {@link org.apache.hadoop.hbase.master.AssignmentManager} to assign regions in the edge cases. - * It doesn't provide an implementation of the actual balancing algorithm. + * The base class for load balancers. It provides the the functions used to by + * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions + * in the edge cases. It doesn't provide an implementation of the + * actual balancing algorithm. + * */ public abstract class BaseLoadBalancer implements LoadBalancer { protected static final int MIN_SERVER_BALANCE = 2; @@ -202,7 +204,15 @@ public abstract class BaseLoadBalancer implements LoadBalancer { // Use servername and port as there can be dead servers in this list. We want everything with // a matching hostname and port to have the same index. for (ServerName sn : clusterState.keySet()) { - if (serversToIndex.get(sn.getHostAndPort()) == null) { + if (sn == null) { + LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " + + "skipping; unassigned regions?"); + if (LOG.isTraceEnabled()) { + LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); + } + continue; + } + if (serversToIndex.get(sn.getAddress().toString()) == null) { serversToIndex.put(sn.getHostAndPort(), numServers++); } if (!hostsToIndex.containsKey(sn.getHostname())) { @@ -257,6 +267,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) { + if (entry.getKey() == null) { + LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); + continue; + } int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); // keep the servername if this is the first server name for this hostname @@ -585,8 +599,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer { /** * Return true if the placement of region on server would lower the availability * of the region in question - * @param server - * @param region * @return true or false */ boolean wouldLowerAvailability(HRegionInfo regionInfo, ServerName serverName) { @@ -899,8 +911,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } if (leastLoadedServerIndex != -1) { - LOG.debug("Pick the least loaded server " + servers[leastLoadedServerIndex].getHostname() - + " with better locality for region " + regions[region]); + if (LOG.isTraceEnabled()) { + LOG.trace("Pick the least loaded server " + + servers[leastLoadedServerIndex].getHostname() + + " with better locality for region " + regions[region].getShortNameToLog()); + } } return leastLoadedServerIndex; } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java index fd98c9c..a8e22ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java @@ -469,6 +469,10 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements } } + public synchronized List<ServerName> getFavoredNodes(HRegionInfo regionInfo) { + return this.fnm.getFavoredNodes(regionInfo); + } + /* * Generate Favored Nodes for daughters during region split. * @@ -709,7 +713,12 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements // No favored nodes, lets unassign. LOG.warn("Region not on favored nodes, unassign. Region: " + hri + " current: " + current + " favored nodes: " + favoredNodes); - this.services.getAssignmentManager().unassign(hri); + try { + this.services.getAssignmentManager().unassign(hri); + } catch (IOException e) { + LOG.warn("Failed unassign", e); + continue; + } RegionPlan rp = new RegionPlan(hri, null, null); regionPlans.add(rp); misplacedRegions++; http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index f7e166d..907e745 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -39,9 +38,8 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -149,19 +147,15 @@ class RegionLocationFinder { if (services == null) { return false; } - AssignmentManager am = services.getAssignmentManager(); + final AssignmentManager am = services.getAssignmentManager(); if (am == null) { return false; } - RegionStates regionStates = am.getRegionStates(); - if (regionStates == null) { - return false; - } - Set<HRegionInfo> regions = regionStates.getRegionAssignments().keySet(); + // TODO: Should this refresh all the regions or only the ones assigned? boolean includesUserTables = false; - for (final HRegionInfo hri : regions) { + for (final HRegionInfo hri : am.getAssignedRegions()) { cache.refresh(hri); includesUserTables = includesUserTables || !hri.isSystemTable(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java index 7e8d696..818156d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java @@ -20,28 +20,27 @@ package org.apache.hadoop.hbase.master.balancer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Random; import java.util.TreeMap; -import java.util.Comparator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.util.Pair; import com.google.common.collect.MinMaxPriorityQueue; -import org.apache.hadoop.hbase.util.Pair; /** * Makes decisions about the placement and movement of Regions across @@ -54,7 +53,7 @@ import org.apache.hadoop.hbase.util.Pair; * locations for all Regions in a cluster. * * <p>This classes produces plans for the - * {@link org.apache.hadoop.hbase.master.AssignmentManager} to execute. + * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to execute. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class SimpleLoadBalancer extends BaseLoadBalancer { http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 53db1f2..4b96bc6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -293,9 +293,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { if (total <= 0 || sumMultiplier <= 0 || (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance)) { - LOG.info("Skipping load balancing because balanced cluster; " + "total cost is " + total + if (LOG.isTraceEnabled()) { + LOG.trace("Skipping load balancing because balanced cluster; " + "total cost is " + total + ", sum multiplier is " + sumMultiplier + " min cost which need balance is " + minCostNeedBalance); + } return false; } return true; @@ -1153,11 +1155,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { stats = new double[cluster.numServers]; } - for (int i =0; i < cluster.numServers; i++) { + for (int i = 0; i < cluster.numServers; i++) { stats[i] = 0; for (int regionIdx : cluster.regionsPerServer[i]) { if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) { - stats[i] ++; + stats[i]++; } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java index 512f7e2..edbba83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java @@ -232,7 +232,8 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv> } @Override - protected Procedure<?>[] execute(final MasterProcedureEnv env) throws ProcedureSuspendedException { + protected Procedure<MasterProcedureEnv>[] execute(final MasterProcedureEnv env) + throws ProcedureSuspendedException { // Local master locks don't store any state, so on recovery, simply finish this procedure // immediately. if (recoveredMasterLock) return null; http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java index 03fdaef..6ebadb4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java @@ -52,9 +52,8 @@ public abstract class AbstractStateMachineNamespaceProcedure<TState> @Override public void toStringClassDetails(final StringBuilder sb) { sb.append(getClass().getSimpleName()); - sb.append(" (namespace="); + sb.append(", namespace="); sb.append(getNamespaceName()); - sb.append(")"); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java new file mode 100644 index 0000000..41502d4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; + +/** + * Base class for all the Region procedures that want to use a StateMachine. + * It provides some basic helpers like basic locking, sync latch, and toStringClassDetails(). + * Defaults to holding the lock for the life of the procedure. + */ +@InterfaceAudience.Private +public abstract class AbstractStateMachineRegionProcedure<TState> + extends AbstractStateMachineTableProcedure<TState> { + private HRegionInfo hri; + private volatile boolean lock = false; + + public AbstractStateMachineRegionProcedure(final MasterProcedureEnv env, + final HRegionInfo hri) { + super(env); + this.hri = hri; + } + + public AbstractStateMachineRegionProcedure() { + // Required by the Procedure framework to create the procedure on replay + super(); + } + + /** + * @return The HRegionInfo of the region we are operating on. + */ + protected HRegionInfo getRegion() { + return this.hri; + } + + /** + * Used when deserializing. Otherwise, DON'T TOUCH IT! + */ + protected void setRegion(final HRegionInfo hri) { + this.hri = hri; + } + + @Override + public TableName getTableName() { + return getRegion().getTable(); + } + + @Override + public abstract TableOperationType getTableOperationType(); + + @Override + public void toStringClassDetails(final StringBuilder sb) { + super.toStringClassDetails(sb); + sb.append(", region=").append(getRegion().getShortNameToLog()); + } + + /** + * Check whether a table is modifiable - exists and either offline or online with config set + * @param env MasterProcedureEnv + * @throws IOException + */ + protected void checkTableModifiable(final MasterProcedureEnv env) throws IOException { + // Checks whether the table exists + if (!MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), getTableName())) { + throw new TableNotFoundException(getTableName()); + } + } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + return true; + } + + protected LockState acquireLock(final MasterProcedureEnv env) { + if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT; + if (env.getProcedureScheduler().waitRegions(this, getTableName(), getRegion())) { + return LockState.LOCK_EVENT_WAIT; + } + this.lock = true; + return LockState.LOCK_ACQUIRED; + } + + protected void releaseLock(final MasterProcedureEnv env) { + this.lock = false; + env.getProcedureScheduler().wakeRegions(this, getTableName(), getRegion()); + } + + @Override + protected boolean hasLock(final MasterProcedureEnv env) { + return this.lock; + } + + protected void setFailure(Throwable cause) { + super.setFailure(getClass().getSimpleName(), cause); + } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + HRegionInfo.convert(getRegion()).writeDelimitedTo(stream); + } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + this.hri = HRegionInfo.convert(HBaseProtos.RegionInfo.parseDelimitedFrom(stream)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java index 9f23848..1417159 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.security.User; /** * Base class for all the Table procedures that want to use a StateMachineProcedure. - * It provide some basic helpers like basic locking, sync latch, and basic toStringClassDetails(). + * It provides helpers like basic locking, sync latch, and toStringClassDetails(). */ @InterfaceAudience.Private public abstract class AbstractStateMachineTableProcedure<TState> @@ -50,11 +50,15 @@ public abstract class AbstractStateMachineTableProcedure<TState> this(env, null); } + /** + * @param env Uses this to set Procedure Owner at least. + */ protected AbstractStateMachineTableProcedure(final MasterProcedureEnv env, final ProcedurePrepareLatch latch) { - this.user = env.getRequestUser(); - this.setOwner(user); - + if (env != null) { + this.user = env.getRequestUser(); + this.setOwner(user); + } // used for compatibility with clients without procedures // they need a sync TableExistsException, TableNotFoundException, TableNotDisabledException, ... this.syncLatch = latch; @@ -110,4 +114,4 @@ public abstract class AbstractStateMachineTableProcedure<TState> throw new TableNotFoundException(getTableName()); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java index 7bb2887..34c1853 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; @@ -100,7 +99,10 @@ public class AddColumnFamilyProcedure setNextState(AddColumnFamilyState.ADD_COLUMN_FAMILY_REOPEN_ALL_REGIONS); break; case ADD_COLUMN_FAMILY_REOPEN_ALL_REGIONS: - reOpenAllRegionsIfTableIsOnline(env); + if (env.getAssignmentManager().isTableEnabled(getTableName())) { + addChildProcedure(env.getAssignmentManager() + .createReopenProcedures(getRegionInfoList(env))); + } return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException(this + " unhandled state=" + state); @@ -285,7 +287,8 @@ public class AddColumnFamilyProcedure env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor); // Make sure regions are opened after table descriptor is updated. - reOpenAllRegionsIfTableIsOnline(env); + //reOpenAllRegionsIfTableIsOnline(env); + // TODO: NUKE ROLLBACK!!!! } } @@ -302,25 +305,6 @@ public class AddColumnFamilyProcedure } /** - * Last action from the procedure - executed when online schema change is supported. - * @param env MasterProcedureEnv - * @throws IOException - */ - private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException { - // This operation only run when the table is enabled. - if (!env.getMasterServices().getTableStateManager() - .isTableState(getTableName(), TableState.State.ENABLED)) { - return; - } - - if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) { - LOG.info("Completed add column family operation on table " + getTableName()); - } else { - LOG.warn("Error on reopening the regions on table " + getTableName()); - } - } - - /** * The procedure could be restarted from a different machine. If the variable is null, we need to * retrieve it. * @return traceEnabled @@ -362,7 +346,8 @@ public class AddColumnFamilyProcedure private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException { if (regionInfoList == null) { - regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName()); + regionInfoList = env.getAssignmentManager().getRegionStates() + .getRegionsOfTable(getTableName()); } return regionInfoList; } http://git-wip-us.apache.org/repos/asf/hbase/blob/6ae5b3bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java index 683d840..c1d0326 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java @@ -149,10 +149,12 @@ public class CloneSnapshotProcedure setNextState(CloneSnapshotState.CLONE_SNAPSHOT_ASSIGN_REGIONS); break; case CLONE_SNAPSHOT_ASSIGN_REGIONS: - CreateTableProcedure.assignRegions(env, getTableName(), newRegions); + CreateTableProcedure.setEnablingState(env, getTableName()); + addChildProcedure(env.getAssignmentManager().createAssignProcedures(newRegions)); setNextState(CloneSnapshotState.CLONE_SNAPSHOT_UPDATE_DESC_CACHE); break; case CLONE_SNAPSHOT_UPDATE_DESC_CACHE: + CreateTableProcedure.setEnabledState(env, getTableName()); CreateTableProcedure.updateTableDescCache(env, getTableName()); setNextState(CloneSnapshotState.CLONE_SNAPHOST_RESTORE_ACL); break;