http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-protocol-shaded/src/main/protobuf/Admin.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto index 47d39be..36221c24 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto @@ -119,14 +119,14 @@ message CloseRegionResponse { } /** - * Closes the specified region and create - * child region. + * Closes the specified region(s) for + * split or merge */ -message CloseRegionForSplitRequest { - required RegionSpecifier region = 1; +message CloseRegionForSplitOrMergeRequest { + repeated RegionSpecifier region = 1; } -message CloseRegionForSplitResponse { +message CloseRegionForSplitOrMergeResponse { required bool closed = 1; } @@ -295,8 +295,8 @@ service AdminService { rpc CloseRegion(CloseRegionRequest) returns(CloseRegionResponse); - rpc CloseRegionForSplit(CloseRegionForSplitRequest) - returns(CloseRegionForSplitResponse); + rpc CloseRegionForSplitOrMerge(CloseRegionForSplitOrMergeRequest) + returns(CloseRegionForSplitOrMergeResponse); rpc FlushRegion(FlushRegionRequest) returns(FlushRegionResponse);
http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-protocol-shaded/src/main/protobuf/Master.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index 9e6d1ed..b283ed9 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -93,6 +93,20 @@ message DispatchMergingRegionsResponse { optional uint64 proc_id = 1; } +/** + * Merging the specified regions in a table. + */ +message MergeTableRegionsRequest { + repeated RegionSpecifier region = 1; + optional bool forcible = 3 [default = false]; + optional uint64 nonce_group = 4 [default = 0]; + optional uint64 nonce = 5 [default = 0]; +} + +message MergeTableRegionsResponse { + optional uint64 proc_id = 1; +} + message AssignRegionRequest { required RegionSpecifier region = 1; } @@ -593,6 +607,10 @@ service MasterService { rpc DispatchMergingRegions(DispatchMergingRegionsRequest) returns(DispatchMergingRegionsResponse); + /** Master merge the regions */ + rpc MergeTableRegions(MergeTableRegionsRequest) + returns(MergeTableRegionsResponse); + /** Assign a region to a server chosen at random. */ rpc AssignRegion(AssignRegionRequest) returns(AssignRegionResponse); http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 8926605..23d914e 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -277,11 +277,32 @@ message DispatchMergingRegionsStateData { optional bool forcible = 4; } +enum MergeTableRegionsState { + MERGE_TABLE_REGIONS_PREPARE = 1; + MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS = 2; + MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION = 3; + MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE = 4; + MERGE_TABLE_REGIONS_CLOSE_REGIONS = 5; + MERGE_TABLE_REGIONS_CREATE_MERGED_REGION = 6; + MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION = 7; + MERGE_TABLE_REGIONS_UPDATE_META = 8; + MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION = 9; + MERGE_TABLE_REGIONS_OPEN_MERGED_REGION = 10; + MERGE_TABLE_REGIONS_POST_OPERATION = 11; +} + +message MergeTableRegionsStateData { + required UserInformation user_info = 1; + repeated RegionInfo region_info = 2; + required RegionInfo merged_region_info = 3; + optional bool forcible = 4 [default = false]; +} + enum SplitTableRegionState { SPLIT_TABLE_REGION_PREPARE = 1; SPLIT_TABLE_REGION_PRE_OPERATION = 2; SPLIT_TABLE_REGION_SET_SPLITTING_TABLE_STATE = 3; - SPLIT_TABLE_REGION_CLOSED_PARENT_REGION = 4; + SPLIT_TABLE_REGION_CLOSE_PARENT_REGION = 4; SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS = 5; SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR = 6; SPLIT_TABLE_REGION_UPDATE_META = 7; http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java index 70167bb..2065939 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java @@ -1030,6 +1030,18 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService } @Override + public void preMergeRegions( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException { + } + + @Override + public void postMergeRegions( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException { + } + + @Override public void preMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx, Set<HostAndPort> servers, String targetGroup) throws IOException { } @@ -1133,7 +1145,40 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService } @Override - public void preRollBackSplitRegionAction( + public void postRollBackSplitRegionAction( final ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException { } + + @Override + public void preMergeRegionsAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException { + } + + @Override + public void postCompletedMergeRegionsAction( + final ObserverContext<MasterCoprocessorEnvironment> c, + final HRegionInfo[] regionsToMerge, + final HRegionInfo mergedRegion) throws IOException { + } + + @Override + public void preMergeRegionsCommitAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge, + final List<Mutation> metaEntries) throws IOException { + } + + @Override + public void postMergeRegionsCommitAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge, + final HRegionInfo mergedRegion) throws IOException { + } + + @Override + public void postRollBackMergeRegionsAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException { + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java index 21381e8..93b2085 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; -import org.apache.hadoop.hbase.regionserver.Region; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving @@ -480,6 +479,18 @@ public class BaseMasterAndRegionObserver extends BaseRegionObserver } @Override + public void preMergeRegions( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException { + } + + @Override + public void postMergeRegions( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException { + } + + @Override public void preAbortProcedure( ObserverContext<MasterCoprocessorEnvironment> ctx, final ProcedureExecutor<MasterProcedureEnv> procEnv, @@ -831,7 +842,40 @@ public class BaseMasterAndRegionObserver extends BaseRegionObserver } @Override - public void preRollBackSplitRegionAction(final ObserverContext<MasterCoprocessorEnvironment> ctx) + public void postRollBackSplitRegionAction(final ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException { } + + @Override + public void preMergeRegionsAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException { + } + + @Override + public void postCompletedMergeRegionsAction( + final ObserverContext<MasterCoprocessorEnvironment> c, + final HRegionInfo[] regionsToMerge, + final HRegionInfo mergedRegion) throws IOException { + } + + @Override + public void preMergeRegionsCommitAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge, + final List<Mutation> metaEntries) throws IOException { + } + + @Override + public void postMergeRegionsCommitAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge, + final HRegionInfo mergedRegion) throws IOException { + } + + @Override + public void postRollBackMergeRegionsAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException { + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java index 4d24a84..23afe4b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; -import org.apache.hadoop.hbase.regionserver.Region; @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.CONFIG}) @InterfaceStability.Evolving @@ -755,6 +754,18 @@ public class BaseMasterObserver implements MasterObserver { } @Override + public void preMergeRegions( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException { + } + + @Override + public void postMergeRegions( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException { + } + + @Override public void preAbortProcedure( ObserverContext<MasterCoprocessorEnvironment> ctx, final ProcedureExecutor<MasterProcedureEnv> procEnv, @@ -852,11 +863,44 @@ public class BaseMasterObserver implements MasterObserver { } @Override - public void preRollBackSplitRegionAction(final ObserverContext<MasterCoprocessorEnvironment> ctx) + public void postRollBackSplitRegionAction(final ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException { } @Override + public void preMergeRegionsAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException { + } + + @Override + public void postCompletedMergeRegionsAction( + final ObserverContext<MasterCoprocessorEnvironment> c, + final HRegionInfo[] regionsToMerge, + final HRegionInfo mergedRegion) throws IOException { + } + + @Override + public void preMergeRegionsCommitAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge, + final List<Mutation> metaEntries) throws IOException { + } + + @Override + public void postMergeRegionsCommitAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge, + final HRegionInfo mergedRegion) throws IOException { + } + + @Override + public void postRollBackMergeRegionsAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException { + } + + @Override public void preBalance(ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException { } http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java index e90f753..9abcd52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaMutationAnnotation; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.ServerName; @@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; @@ -1196,14 +1198,74 @@ public interface MasterObserver extends Coprocessor { throws IOException; /** - * This will be called before the roll back of the split region is completed + * This will be called after the roll back of the split region is completed * @param ctx the environment to interact with the framework and master * @throws IOException */ - void preRollBackSplitRegionAction(final ObserverContext<MasterCoprocessorEnvironment> ctx) + void postRollBackSplitRegionAction(final ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException; /** + * Called before the regions merge. + * Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} to skip the merge. + * @throws IOException if an error occurred on the coprocessor + * @param ctx + * @param regionsToMerge + * @throws IOException + */ + void preMergeRegionsAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException; + + /** + * called after the regions merge. + * @param c + * @param regionsToMerge + * @param mergedRegion + * @throws IOException + */ + void postCompletedMergeRegionsAction( + final ObserverContext<MasterCoprocessorEnvironment> c, + final HRegionInfo[] regionsToMerge, + final HRegionInfo mergedRegion) throws IOException; + + /** + * This will be called before PONR step as part of regions merge transaction. Calling + * {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} rollback the merge + * @param ctx + * @param regionsToMerge + * @param metaEntries mutations to execute on hbase:meta atomically with regions merge updates. + * Any puts or deletes to execute on hbase:meta can be added to the mutations. + * @throws IOException + */ + void preMergeRegionsCommitAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge, + @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException; + + /** + * This will be called after PONR step as part of regions merge transaction. + * @param ctx + * @param regionsToMerge + * @param mergedRegion + * @throws IOException + */ + void postMergeRegionsCommitAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge, + final HRegionInfo mergedRegion) throws IOException; + + /** + * This will be called after the roll back of the regions merge. + * @param ctx + * @param regionsToMerge + * @throws IOException + */ + void postRollBackMergeRegionsAction( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException; + + /** * Called prior to modifying the flag used to enable/disable region balancing. * @param ctx the coprocessor instance's environment * @param newValue the new flag value submitted in the call @@ -1651,6 +1713,27 @@ public interface MasterObserver extends Coprocessor { final HRegionInfo regionA, final HRegionInfo regionB) throws IOException; /** + * Called before merge regions request. + * It can't bypass the default action, e.g., ctx.bypass() won't have effect. + * @param ctx coprocessor environment + * @param regionsToMerge regions to be merged + * @throws IOException if an error occurred on the coprocessor + */ + void preMergeRegions( + final ObserverContext<MasterCoprocessorEnvironment> ctx, + final HRegionInfo[] regionsToMerge) throws IOException; + + /** + * called after merge regions request. + * @param c coprocessor environment + * @param regionsToMerge regions to be merged + * @throws IOException if an error occurred on the coprocessor + */ + void postMergeRegions( + final ObserverContext<MasterCoprocessorEnvironment> c, + final HRegionInfo[] regionsToMerge) throws IOException; + + /** * Called before servers are moved to target region server group * @param ctx the environment to interact with the framework and master * @param servers set of servers to move http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 3540b19..a8061a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -2630,6 +2630,39 @@ public class AssignmentManager { return null; } + public void assignMergedRegion( + final HRegionInfo mergedRegion, + final HRegionInfo daughterAHRI, + final HRegionInfo daughterBHRI) throws InterruptedException, IOException { + //Offline the daughter regions + regionOffline(daughterAHRI, State.MERGED); + regionOffline(daughterBHRI, State.MERGED); + + //Set merged region to offline + regionStates.prepareAssignMergedRegion(mergedRegion); + + // Assign merged region + invokeAssign(mergedRegion); + + Callable<Object> mergeReplicasCallable = new Callable<Object>() { + @Override + public Object call() { + doMergingOfReplicas(mergedRegion, daughterAHRI, daughterBHRI); + return null; + } + }; + threadPoolExecutorService.submit(mergeReplicasCallable); + + // wait for assignment completion + ArrayList<HRegionInfo> regionAssignSet = new ArrayList<HRegionInfo>(1); + regionAssignSet.add(mergedRegion); + while (!waitForAssignment(regionAssignSet, true, regionAssignSet.size(), Long.MAX_VALUE)) { + LOG.debug("The merged region " + mergedRegion + " is still in transition. "); + } + + regionStateListener.onRegionMerged(mergedRegion); + } + private String onRegionMerged(final RegionState current, final HRegionInfo hri, final ServerName serverName, final RegionStateTransition transition) { // The region must be in merging_new state, and the daughters must be http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 5f2e2a6..710c48a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -109,6 +109,7 @@ import org.apache.hadoop.hbase.master.procedure.DispatchMergingRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MergeTableRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; @@ -1420,6 +1421,50 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override + public long mergeRegions( + final HRegionInfo[] regionsToMerge, + final boolean forcible, + final long nonceGroup, + final long nonce) throws IOException { + checkInitialized(); + + assert(regionsToMerge.length == 2); + + TableName tableName = regionsToMerge[0].getTable(); + if (tableName == null || regionsToMerge[1].getTable() == null) { + throw new UnknownRegionException ("Can't merge regions without table associated"); + } + + if (!tableName.equals(regionsToMerge[1].getTable())) { + throw new IOException ( + "Cannot merge regions from two different tables " + regionsToMerge[0].getTable() + + " and " + regionsToMerge[1].getTable()); + } + + if (regionsToMerge[0].compareTo(regionsToMerge[1]) == 0) { + throw new MergeRegionException( + "Cannot merge a region to itself " + regionsToMerge[0] + ", " + regionsToMerge[1]); + } + + if (cpHost != null) { + cpHost.preMergeRegions(regionsToMerge); + } + + LOG.info(getClientIdAuditPrefix() + " Merge regions " + + regionsToMerge[0].getEncodedName() + " and " + regionsToMerge[1].getEncodedName()); + + long procId = this.procedureExecutor.submitProcedure( + new MergeTableRegionsProcedure(procedureExecutor.getEnvironment(), regionsToMerge, forcible), + nonceGroup, + nonce); + + if (cpHost != null) { + cpHost.postMergeRegions(regionsToMerge); + } + return procId; + } + + @Override public long splitRegion( final HRegionInfo regionInfo, final byte[] splitRow, http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index d0ac765..a18068d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaMutationAnnotation; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.ServerName; @@ -792,6 +793,28 @@ public class MasterCoprocessorHost }); } + public void preMergeRegions(final HRegionInfo[] regionsToMerge) + throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + oserver.preMergeRegions(ctx, regionsToMerge); + } + }); + } + + public void postMergeRegions(final HRegionInfo[] regionsToMerge) + throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + oserver.postMergeRegions(ctx, regionsToMerge); + } + }); + } + public boolean preBalance() throws IOException { return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { @Override @@ -928,16 +951,110 @@ public class MasterCoprocessorHost } /** - * Invoked just before the rollback of a failed split is started + * Invoked just after the rollback of a failed split * @param user the user * @throws IOException */ - public void preRollBackSplitAction(final User user) throws IOException { + public void postRollBackSplitRegionAction(final User user) throws IOException { execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) { @Override public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException { - oserver.preRollBackSplitRegionAction(ctx); + oserver.postRollBackSplitRegionAction(ctx); + } + }); + } + + /** + * Invoked just before a merge + * @param regionsToMerge the regions to merge + * @param user the user + * @throws IOException + */ + public boolean preMergeRegionsAction( + final HRegionInfo[] regionsToMerge, final User user) throws IOException { + return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) { + @Override + public void call(MasterObserver oserver, + ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException { + oserver.preMergeRegionsAction(ctx, regionsToMerge); + } + }); + } + + /** + * Invoked after completing merge regions operation + * @param regionsToMerge the regions to merge + * @param mergedRegion the new merged region + * @param user the user + * @throws IOException + */ + public void postCompletedMergeRegionsAction( + final HRegionInfo[] regionsToMerge, + final HRegionInfo mergedRegion, + final User user) throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) { + @Override + public void call(MasterObserver oserver, + ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException { + oserver.postCompletedMergeRegionsAction(ctx, regionsToMerge, mergedRegion); + } + }); + } + + /** + * Invoked before merge regions operation writes the new region to hbase:meta + * @param regionsToMerge the regions to merge + * @param metaEntries the meta entry + * @param user the user + * @throws IOException + */ + public boolean preMergeRegionsCommit( + final HRegionInfo[] regionsToMerge, + final @MetaMutationAnnotation List<Mutation> metaEntries, + final User user) throws IOException { + return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) { + @Override + public void call(MasterObserver oserver, + ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException { + oserver.preMergeRegionsCommitAction(ctx, regionsToMerge, metaEntries); + } + }); + } + + /** + * Invoked after merge regions operation writes the new region to hbase:meta + * @param regionsToMerge the regions to merge + * @param mergedRegion the new merged region + * @param user the user + * @throws IOException + */ + public void postMergeRegionsCommit( + final HRegionInfo[] regionsToMerge, + final HRegionInfo mergedRegion, + final User user) throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) { + @Override + public void call(MasterObserver oserver, + ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException { + oserver.postMergeRegionsCommitAction(ctx, regionsToMerge, mergedRegion); + } + }); + } + + /** + * Invoked after rollback merge regions operation + * @param regionsToMerge the regions to merge + * @param user the user + * @throws IOException + */ + public void postRollBackMergeRegionsAction( + final HRegionInfo[] regionsToMerge, final User user) throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) { + @Override + public void call(MasterObserver oserver, + ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException { + oserver.postRollBackMergeRegionsAction(ctx, regionsToMerge); } }); } http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 97eb209..709b3f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -553,6 +553,46 @@ public class MasterRpcServices extends RSRpcServices } @Override + public MergeTableRegionsResponse mergeTableRegions( + RpcController c, MergeTableRegionsRequest request) throws ServiceException { + try { + master.checkInitialized(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + + RegionStates regionStates = master.getAssignmentManager().getRegionStates(); + + assert(request.getRegionCount() == 2); + HRegionInfo[] regionsToMerge = new HRegionInfo[request.getRegionCount()]; + for (int i = 0; i < request.getRegionCount(); i++) { + final byte[] encodedNameOfRegion = request.getRegion(i).getValue().toByteArray(); + if (request.getRegion(i).getType() != RegionSpecifierType.ENCODED_REGION_NAME) { + LOG.warn("MergeRegions specifier type: expected: " + + RegionSpecifierType.ENCODED_REGION_NAME + " actual: region " + i + " =" + + request.getRegion(i).getType()); + } + RegionState regionState = regionStates.getRegionState(Bytes.toString(encodedNameOfRegion)); + if (regionState == null) { + throw new ServiceException( + new UnknownRegionException(Bytes.toStringBinary(encodedNameOfRegion))); + } + regionsToMerge[i] = regionState.getRegion(); + } + + try { + long procId = master.mergeRegions( + regionsToMerge, + request.getForcible(), + request.getNonceGroup(), + request.getNonce()); + return MergeTableRegionsResponse.newBuilder().setProcId(procId).build(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } + + @Override public SplitTableRegionResponse splitRegion( final RpcController controller, final SplitTableRegionRequest request) throws ServiceException { http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index fa1c33d..a4c27f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -265,6 +265,21 @@ public interface MasterServices extends Server { throws IOException; /** + * Merge regions in a table. + * @param regionsToMerge daughter regions to merge + * @param forcible whether to force to merge even two regions are not adjacent + * @param nonceGroup used to detect duplicate + * @param nonce used to detect duplicate + * @return procedure Id + * @throws IOException + */ + long mergeRegions( + final HRegionInfo[] regionsToMerge, + final boolean forcible, + final long nonceGroup, + final long nonce) throws IOException; + + /** * Split a region. * @param regionInfo region to split * @param splitRow split point @@ -273,7 +288,7 @@ public interface MasterServices extends Server { * @return procedure Id * @throws IOException */ - public long splitRegion( + long splitRegion( final HRegionInfo regionInfo, final byte [] splitRow, final long nonceGroup, http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index b199374..7c2df61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -896,6 +896,14 @@ public class RegionStates { } } + public void prepareAssignMergedRegion(HRegionInfo mergedRegion) { + synchronized (this) { + if (isRegionInState(mergedRegion, State.MERGING_NEW)) { + updateRegionState(mergedRegion, State.OFFLINE, null); + } + } + } + void splitRegion(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException { http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index a567e1d..b76cd7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -823,23 +823,22 @@ public class ServerManager { * A region server could reject the close request because it either does not * have the specified region or the region is being split. * @param server server to close a region - * @param regionToClose the info of the region to close + * @param regionToClose the info of the region(s) to close * @throws IOException */ - public boolean sendRegionCloseForSplit( + public boolean sendRegionCloseForSplitOrMerge( final ServerName server, - final HRegionInfo regionToClose) throws IOException { + final HRegionInfo... regionToClose) throws IOException { if (server == null) { throw new NullPointerException("Passed server is null"); } AdminService.BlockingInterface admin = getRsAdmin(server); if (admin == null) { - throw new IOException("Attempting to send CLOSE For Split RPC to server " + - server.toString() + " for region " + regionToClose.getRegionNameAsString() + - " failed because no RPC connection found to this server"); + throw new IOException("Attempting to send CLOSE For Split or Merge RPC to server " + + server.toString() + " failed because no RPC connection found to this server."); } HBaseRpcController controller = newRpcController(); - return ProtobufUtil.closeRegionForSplit(controller, admin, server, regionToClose); + return ProtobufUtil.closeRegionForSplitOrMerge(controller, admin, server, regionToClose); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java new file mode 100644 index 0000000..c313700 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java @@ -0,0 +1,907 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaMutationAnnotation; +import org.apache.hadoop.hbase.RegionLoad; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.UnknownRegionException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.exceptions.MergeRegionException; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.CatalogJanitor; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MergeTableRegionsState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; + +/** + * The procedure to Merge a region in a table. + */ +@InterfaceAudience.Private +public class MergeTableRegionsProcedure + extends AbstractStateMachineTableProcedure<MergeTableRegionsState> { + private static final Log LOG = LogFactory.getLog(MergeTableRegionsProcedure.class); + + private Boolean traceEnabled; + private AssignmentManager assignmentManager; + private int timeout; + private ServerName regionLocation; + private String regionsToMergeListFullName; + private String regionsToMergeListEncodedName; + + private HRegionInfo [] regionsToMerge; + private HRegionInfo mergedRegionInfo; + private boolean forcible; + + public MergeTableRegionsProcedure() { + this.traceEnabled = isTraceEnabled(); + this.assignmentManager = null; + this.timeout = -1; + this.regionLocation = null; + this.regionsToMergeListFullName = null; + this.regionsToMergeListEncodedName = null; + } + + public MergeTableRegionsProcedure( + final MasterProcedureEnv env, + final HRegionInfo[] regionsToMerge, + final boolean forcible) throws IOException { + super(env); + this.traceEnabled = isTraceEnabled(); + this.assignmentManager = getAssignmentManager(env); + // For now, we only merge 2 regions. It could be extended to more than 2 regions in + // the future. + assert(regionsToMerge.length == 2); + assert(regionsToMerge[0].getTable() == regionsToMerge[1].getTable()); + this.regionsToMerge = regionsToMerge; + this.forcible = forcible; + + this.timeout = -1; + this.regionsToMergeListFullName = getRegionsToMergeListFullNameString(); + this.regionsToMergeListEncodedName = getRegionsToMergeListEncodedNameString(); + + // Check daughter regions and make sure that we have valid daughter regions before + // doing the real work. + checkDaughterRegions(); + // WARN: make sure there is no parent region of the two merging regions in + // hbase:meta If exists, fixing up daughters would cause daughter regions(we + // have merged one) online again when we restart master, so we should clear + // the parent region to prevent the above case + // Since HBASE-7721, we don't need fix up daughters any more. so here do + // nothing + setupMergedRegionInfo(); + } + + @Override + protected Flow executeFromState( + final MasterProcedureEnv env, + final MergeTableRegionsState state) throws InterruptedException { + if (isTraceEnabled()) { + LOG.trace(this + " execute state=" + state); + } + + try { + switch (state) { + case MERGE_TABLE_REGIONS_PREPARE: + prepareMergeRegion(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS); + break; + case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS: + if (MoveRegionsToSameRS(env)) { + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION); + } else { + LOG.info("Cancel merging regions " + getRegionsToMergeListFullNameString() + + ", because can't move them to the same RS"); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION); + } + break; + case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION: + preMergeRegions(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE); + break; + case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE: + setRegionStateToMerging(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS); + break; + case MERGE_TABLE_REGIONS_CLOSE_REGIONS: + closeRegionsForMerge(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION); + break; + case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION: + createMergedRegion(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION); + break; + case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION: + preMergeRegionsCommit(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_UPDATE_META); + break; + case MERGE_TABLE_REGIONS_UPDATE_META: + updateMetaForMergedRegions(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION); + break; + case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: + postMergeRegionsCommit(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_OPEN_MERGED_REGION); + break; + case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: + openMergedRegions(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION); + break; + case MERGE_TABLE_REGIONS_POST_OPERATION: + postCompletedMergeRegions(env); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); + } + } catch (IOException e) { + LOG.warn("Error trying to merge regions " + getRegionsToMergeListFullNameString() + + " in the table " + getTableName() + " (in state=" + state + ")", e); + + setFailure("master-merge-regions", e); + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState( + final MasterProcedureEnv env, + final MergeTableRegionsState state) throws IOException, InterruptedException { + if (isTraceEnabled()) { + LOG.trace(this + " rollback state=" + state); + } + + try { + switch (state) { + case MERGE_TABLE_REGIONS_POST_OPERATION: + case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: + case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: + case MERGE_TABLE_REGIONS_UPDATE_META: + String msg = this + " We are in the " + state + " state." + + " It is complicated to rollback the merge operation that region server is working on." + + " Rollback is not supported and we should let the merge operation to complete"; + LOG.warn(msg); + // PONR + throw new UnsupportedOperationException(this + " unhandled state=" + state); + case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION: + break; + case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION: + cleanupMergedRegion(env); + break; + case MERGE_TABLE_REGIONS_CLOSE_REGIONS: + rollbackCloseRegionsForMerge(env); + break; + case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE: + setRegionStateToRevertMerging(env); + break; + case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION: + postRollBackMergeRegions(env); + break; + case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS: + break; // nothing to rollback + case MERGE_TABLE_REGIONS_PREPARE: + break; // nothing to rollback + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); + } + } catch (Exception e) { + // This will be retried. Unless there is a bug in the code, + // this should be just a "temporary error" (e.g. network down) + LOG.warn("Failed rollback attempt step " + state + " for merging the regions " + + getRegionsToMergeListFullNameString() + " in table " + getTableName(), e); + throw e; + } + } + + /* + * Check whether we are in the state that can be rollback + */ + @Override + protected boolean isRollbackSupported(final MergeTableRegionsState state) { + switch (state) { + case MERGE_TABLE_REGIONS_POST_OPERATION: + case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: + case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: + case MERGE_TABLE_REGIONS_UPDATE_META: + // It is not safe to rollback if we reach to these states. + return false; + default: + break; + } + return true; + } + + @Override + protected MergeTableRegionsState getState(final int stateId) { + return MergeTableRegionsState.valueOf(stateId); + } + + @Override + protected int getStateId(final MergeTableRegionsState state) { + return state.getNumber(); + } + + @Override + protected MergeTableRegionsState getInitialState() { + return MergeTableRegionsState.MERGE_TABLE_REGIONS_PREPARE; + } + + @Override + public void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + + MasterProcedureProtos.MergeTableRegionsStateData.Builder mergeTableRegionsMsg = + MasterProcedureProtos.MergeTableRegionsStateData.newBuilder() + .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser())) + .setMergedRegionInfo(HRegionInfo.convert(mergedRegionInfo)) + .setForcible(forcible); + for (HRegionInfo hri: regionsToMerge) { + mergeTableRegionsMsg.addRegionInfo(HRegionInfo.convert(hri)); + } + mergeTableRegionsMsg.build().writeDelimitedTo(stream); + } + + @Override + public void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + + MasterProcedureProtos.MergeTableRegionsStateData mergeTableRegionsMsg = + MasterProcedureProtos.MergeTableRegionsStateData.parseDelimitedFrom(stream); + setUser(MasterProcedureUtil.toUserInfo(mergeTableRegionsMsg.getUserInfo())); + + assert(mergeTableRegionsMsg.getRegionInfoCount() == 2); + regionsToMerge = new HRegionInfo[mergeTableRegionsMsg.getRegionInfoCount()]; + for (int i = 0; i < regionsToMerge.length; i++) { + regionsToMerge[i] = HRegionInfo.convert(mergeTableRegionsMsg.getRegionInfo(i)); + } + + mergedRegionInfo = HRegionInfo.convert(mergeTableRegionsMsg.getMergedRegionInfo()); + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" (table="); + sb.append(getTableName()); + sb.append(" regions="); + sb.append(getRegionsToMergeListFullNameString()); + sb.append(" forcible="); + sb.append(forcible); + sb.append(")"); + } + + @Override + protected boolean acquireLock(final MasterProcedureEnv env) { + if (env.waitInitialized(this)) { + return false; + } + return !env.getProcedureQueue().waitRegions( + this, getTableName(), regionsToMerge[0], regionsToMerge[1]); + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + env.getProcedureQueue().wakeRegions(this, getTableName(), regionsToMerge[0], regionsToMerge[1]); + } + + @Override + public TableName getTableName() { + return regionsToMerge[0].getTable(); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.MERGE; + } + + /** + * check daughter regions + * @throws IOException + */ + private void checkDaughterRegions() throws IOException { + // Note: the following logic assumes that we only have 2 regions to merge. In the future, + // if we want to extend to more than 2 regions, the code needs to modify a little bit. + // + if (regionsToMerge[0].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID || + regionsToMerge[1].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { + throw new MergeRegionException("Can't merge non-default replicas"); + } + + if (!HRegionInfo.areAdjacent(regionsToMerge[0], regionsToMerge[1])) { + String msg = "Trying to merge non-adjacent regions " + + getRegionsToMergeListFullNameString() + " where forcible = " + forcible; + LOG.warn(msg); + if (!forcible) { + throw new DoNotRetryIOException(msg); + } + } + } + + /** + * Prepare merge and do some check + * @param env MasterProcedureEnv + * @throws IOException + */ + private void prepareMergeRegion(final MasterProcedureEnv env) throws IOException { + // Note: the following logic assumes that we only have 2 regions to merge. In the future, + // if we want to extend to more than 2 regions, the code needs to modify a little bit. + // + CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor(); + boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]); + if (regionAHasMergeQualifier + || !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) { + String msg = "Skip merging regions " + getRegionsToMergeListFullNameString() + + ", because region " + + (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1] + .getEncodedName()) + " has merge qualifier"; + LOG.warn(msg); + throw new MergeRegionException(msg); + } + + RegionStates regionStates = getAssignmentManager(env).getRegionStates(); + RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName()); + RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName()); + if (regionStateA == null || regionStateB == null) { + throw new UnknownRegionException( + regionStateA == null ? + regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName()); + } + + if (!regionStateA.isOpened() || !regionStateB.isOpened()) { + throw new MergeRegionException( + "Unable to merge regions not online " + regionStateA + ", " + regionStateB); + } + } + + /** + * Create merged region info through the specified two regions + */ + private void setupMergedRegionInfo() { + long rid = EnvironmentEdgeManager.currentTime(); + // Regionid is timestamp. Merged region's id can't be less than that of + // merging regions else will insert at wrong location in hbase:meta + if (rid < regionsToMerge[0].getRegionId() || rid < regionsToMerge[1].getRegionId()) { + LOG.warn("Clock skew; merging regions id are " + regionsToMerge[0].getRegionId() + + " and " + regionsToMerge[1].getRegionId() + ", but current time here is " + rid); + rid = Math.max(regionsToMerge[0].getRegionId(), regionsToMerge[1].getRegionId()) + 1; + } + + byte[] startKey = null; + byte[] endKey = null; + // Choose the smaller as start key + if (regionsToMerge[0].compareTo(regionsToMerge[1]) <= 0) { + startKey = regionsToMerge[0].getStartKey(); + } else { + startKey = regionsToMerge[1].getStartKey(); + } + // Choose the bigger as end key + if (Bytes.equals(regionsToMerge[0].getEndKey(), HConstants.EMPTY_BYTE_ARRAY) + || (!Bytes.equals(regionsToMerge[1].getEndKey(), HConstants.EMPTY_BYTE_ARRAY) + && Bytes.compareTo(regionsToMerge[0].getEndKey(), regionsToMerge[1].getEndKey()) > 0)) { + endKey = regionsToMerge[0].getEndKey(); + } else { + endKey = regionsToMerge[1].getEndKey(); + } + + // Merged region is sorted between two merging regions in META + mergedRegionInfo = new HRegionInfo(getTableName(), startKey, endKey, false, rid); + } + + /** + * Move all regions to the same region server + * @param env MasterProcedureEnv + * @return whether target regions hosted by the same RS + * @throws IOException + */ + private boolean MoveRegionsToSameRS(final MasterProcedureEnv env) throws IOException { + // Make sure regions are on the same regionserver before send merge + // regions request to region server. + // + boolean onSameRS = isRegionsOnTheSameServer(env); + if (!onSameRS) { + // Note: the following logic assumes that we only have 2 regions to merge. In the future, + // if we want to extend to more than 2 regions, the code needs to modify a little bit. + // + RegionStates regionStates = getAssignmentManager(env).getRegionStates(); + ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]); + + RegionLoad loadOfRegionA = getRegionLoad(env, regionLocation, regionsToMerge[0]); + RegionLoad loadOfRegionB = getRegionLoad(env, regionLocation2, regionsToMerge[1]); + if (loadOfRegionA != null && loadOfRegionB != null + && loadOfRegionA.getRequestsCount() < loadOfRegionB.getRequestsCount()) { + // switch regionsToMerge[0] and regionsToMerge[1] + HRegionInfo tmpRegion = this.regionsToMerge[0]; + this.regionsToMerge[0] = this.regionsToMerge[1]; + this.regionsToMerge[1] = tmpRegion; + ServerName tmpLocation = regionLocation; + regionLocation = regionLocation2; + regionLocation2 = tmpLocation; + } + + long startTime = EnvironmentEdgeManager.currentTime(); + + RegionPlan regionPlan = new RegionPlan(regionsToMerge[1], regionLocation2, regionLocation); + LOG.info("Moving regions to same server for merge: " + regionPlan.toString()); + getAssignmentManager(env).balance(regionPlan); + do { + try { + Thread.sleep(20); + // Make sure check RIT first, then get region location, otherwise + // we would make a wrong result if region is online between getting + // region location and checking RIT + boolean isRIT = regionStates.isRegionInTransition(regionsToMerge[1]); + regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]); + onSameRS = regionLocation.equals(regionLocation2); + if (onSameRS || !isRIT) { + // Regions are on the same RS, or regionsToMerge[1] is not in + // RegionInTransition any more + break; + } + } catch (InterruptedException e) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(e); + throw iioe; + } + } while ((EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env)); + } + return onSameRS; + } + + /** + * Pre merge region action + * @param env MasterProcedureEnv + **/ + private void preMergeRegions(final MasterProcedureEnv env) throws IOException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + boolean ret = cpHost.preMergeRegionsAction(regionsToMerge, getUser()); + if (ret) { + throw new IOException( + "Coprocessor bypassing regions " + getRegionsToMergeListFullNameString() + " merge."); + } + } + } + + /** + * Action after rollback a merge table regions action. + * @param env MasterProcedureEnv + * @throws IOException + */ + private void postRollBackMergeRegions(final MasterProcedureEnv env) throws IOException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postRollBackMergeRegionsAction(regionsToMerge, getUser()); + } + } + + /** + * Set the region states to MERGING state + * @param env MasterProcedureEnv + * @throws IOException + */ + public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException { + RegionStateTransition.Builder transition = RegionStateTransition.newBuilder(); + transition.setTransitionCode(TransitionCode.READY_TO_MERGE); + transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo)); + transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0])); + transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1])); + if (env.getMasterServices().getAssignmentManager().onRegionTransition( + getServerName(env), transition.build()) != null) { + throw new IOException("Failed to update region state to MERGING for " + + getRegionsToMergeListFullNameString()); + } + } + + /** + * Rollback the region state change + * @param env MasterProcedureEnv + * @throws IOException + */ + private void setRegionStateToRevertMerging(final MasterProcedureEnv env) throws IOException { + RegionStateTransition.Builder transition = RegionStateTransition.newBuilder(); + transition.setTransitionCode(TransitionCode.MERGE_REVERTED); + transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo)); + transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0])); + transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1])); + String msg = env.getMasterServices().getAssignmentManager().onRegionTransition( + getServerName(env), transition.build()); + if (msg != null) { + // If daughter regions are online, the msg is coming from RPC retry. Ignore it. + RegionStates regionStates = getAssignmentManager(env).getRegionStates(); + if (!regionStates.isRegionOnline(regionsToMerge[0]) || + !regionStates.isRegionOnline(regionsToMerge[1])) { + throw new IOException("Failed to update region state for " + + getRegionsToMergeListFullNameString() + + " as part of operation for reverting merge. Error message: " + msg); + } + } + } + + /** + * Create merged region + * @param env MasterProcedureEnv + * @throws IOException + */ + private void createMergedRegion(final MasterProcedureEnv env) throws IOException { + final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable()); + final FileSystem fs = mfs.getFileSystem(); + HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( + env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false); + regionFs.createMergesDir(); + + mergeStoreFiles(env, regionFs, regionFs.getMergesDir()); + HRegionFileSystem regionFs2 = HRegionFileSystem.openRegionFromFileSystem( + env.getMasterConfiguration(), fs, tabledir, regionsToMerge[1], false); + mergeStoreFiles(env, regionFs2, regionFs.getMergesDir()); + + regionFs.commitMergedRegion(mergedRegionInfo); + } + + /** + * Create reference file(s) of merging regions under the merges directory + * @param env MasterProcedureEnv + * @param regionFs region file system + * @param mergedDir the temp directory of merged region + * @throws IOException + */ + private void mergeStoreFiles( + final MasterProcedureEnv env, final HRegionFileSystem regionFs, final Path mergedDir) + throws IOException { + final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + final Configuration conf = env.getMasterConfiguration(); + final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName()); + + for (String family: regionFs.getFamilies()) { + final HColumnDescriptor hcd = htd.getFamily(family.getBytes()); + final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family); + + if (storeFiles != null && storeFiles.size() > 0) { + final CacheConfig cacheConf = new CacheConfig(conf, hcd); + for (StoreFileInfo storeFileInfo: storeFiles) { + // Create reference file(s) of the region in mergedDir + regionFs.mergeStoreFile( + mergedRegionInfo, + family, + new StoreFile( + mfs.getFileSystem(), storeFileInfo, conf, cacheConf, hcd.getBloomFilterType()), + mergedDir); + } + } + } + } + + /** + * Clean up merged region + * @param env MasterProcedureEnv + * @throws IOException + */ + private void cleanupMergedRegion(final MasterProcedureEnv env) throws IOException { + final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable()); + final FileSystem fs = mfs.getFileSystem(); + HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( + env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false); + regionFs.cleanupMergedRegion(mergedRegionInfo); + } + + /** + * RPC to region server that host the regions to merge, ask for close these regions + * @param env MasterProcedureEnv + * @throws IOException + */ + private void closeRegionsForMerge(final MasterProcedureEnv env) throws IOException { + boolean success = env.getMasterServices().getServerManager().sendRegionCloseForSplitOrMerge( + getServerName(env), regionsToMerge[0], regionsToMerge[1]); + if (!success) { + throw new IOException("Close regions " + getRegionsToMergeListFullNameString() + + " for merging failed. Check region server log for more details."); + } + } + + /** + * Rollback close regions + * @param env MasterProcedureEnv + **/ + private void rollbackCloseRegionsForMerge(final MasterProcedureEnv env) throws IOException { + // Check whether the region is closed; if so, open it in the same server + RegionStates regionStates = getAssignmentManager(env).getRegionStates(); + for(int i = 1; i < regionsToMerge.length; i++) { + RegionState state = regionStates.getRegionState(regionsToMerge[i]); + if (state != null && (state.isClosing() || state.isClosed())) { + env.getMasterServices().getServerManager().sendRegionOpen( + getServerName(env), + regionsToMerge[i], + ServerName.EMPTY_SERVER_LIST); + } + } + } + + /** + * Post merge region action + * @param env MasterProcedureEnv + **/ + private void preMergeRegionsCommit(final MasterProcedureEnv env) throws IOException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + @MetaMutationAnnotation + final List<Mutation> metaEntries = new ArrayList<Mutation>(); + boolean ret = cpHost.preMergeRegionsCommit(regionsToMerge, metaEntries, getUser()); + + if (ret) { + throw new IOException( + "Coprocessor bypassing regions " + getRegionsToMergeListFullNameString() + " merge."); + } + try { + for (Mutation p : metaEntries) { + HRegionInfo.parseRegionName(p.getRow()); + } + } catch (IOException e) { + LOG.error("Row key of mutation from coprocessor is not parsable as region name." + + "Mutations from coprocessor should only be for hbase:meta table.", e); + throw e; + } + } + } + + /** + * Add merged region to META and delete original regions. + * @param env MasterProcedureEnv + * @throws IOException + */ + private void updateMetaForMergedRegions(final MasterProcedureEnv env) throws IOException { + RegionStateTransition.Builder transition = RegionStateTransition.newBuilder(); + transition.setTransitionCode(TransitionCode.MERGE_PONR); + transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo)); + transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0])); + transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1])); + // Add merged region and delete original regions + // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region + // will determine whether the region is merged or not in case of failures. + if (env.getMasterServices().getAssignmentManager().onRegionTransition( + getServerName(env), transition.build()) != null) { + throw new IOException("Failed to update meta to add merged region that merges " + + getRegionsToMergeListFullNameString()); + } + } + + /** + * Post merge region action + * @param env MasterProcedureEnv + **/ + private void postMergeRegionsCommit(final MasterProcedureEnv env) throws IOException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postMergeRegionsCommit(regionsToMerge, mergedRegionInfo, getUser()); + } + } + + /** + * Assign merged region + * @param env MasterProcedureEnv + * @throws IOException + * @throws InterruptedException + **/ + private void openMergedRegions(final MasterProcedureEnv env) + throws IOException, InterruptedException { + // Check whether the merged region is already opened; if so, + // this is retry and we should just ignore. + RegionState regionState = + getAssignmentManager(env).getRegionStates().getRegionState(mergedRegionInfo); + if (regionState != null && regionState.isOpened()) { + LOG.info("Skip opening merged region " + mergedRegionInfo.getRegionNameAsString() + + " as it is already opened."); + return; + } + + // TODO: The new AM should provide an API to force assign the merged region to the same RS + // as daughter regions; if the RS is unavailable, then assign to a different RS. + env.getMasterServices().getAssignmentManager().assignMergedRegion( + mergedRegionInfo, regionsToMerge[0], regionsToMerge[1]); + } + + /** + * Post merge region action + * @param env MasterProcedureEnv + **/ + private void postCompletedMergeRegions(final MasterProcedureEnv env) throws IOException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postCompletedMergeRegionsAction(regionsToMerge, mergedRegionInfo, getUser()); + } + } + + private RegionLoad getRegionLoad( + final MasterProcedureEnv env, + final ServerName sn, + final HRegionInfo hri) { + ServerManager serverManager = env.getMasterServices().getServerManager(); + ServerLoad load = serverManager.getLoad(sn); + if (load != null) { + Map<byte[], RegionLoad> regionsLoad = load.getRegionsLoad(); + if (regionsLoad != null) { + return regionsLoad.get(hri.getRegionName()); + } + } + return null; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param env MasterProcedureEnv + * @return whether target regions hosted by the same RS + */ + private boolean isRegionsOnTheSameServer(final MasterProcedureEnv env) throws IOException{ + Boolean onSameRS = true; + int i = 0; + RegionStates regionStates = getAssignmentManager(env).getRegionStates(); + regionLocation = regionStates.getRegionServerOfRegion(regionsToMerge[i]); + if (regionLocation != null) { + for(i = 1; i < regionsToMerge.length; i++) { + ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[i]); + if (regionLocation2 != null) { + if (onSameRS) { + onSameRS = regionLocation.equals(regionLocation2); + } + } else { + // At least one region is not online, merge will fail, no need to continue. + break; + } + } + if (i == regionsToMerge.length) { + // Finish checking all regions, return the result; + return onSameRS; + } + } + + // If reaching here, at least one region is not online. + String msg = "Skip merging regions " + getRegionsToMergeListFullNameString() + + ", because region " + regionsToMerge[i].getEncodedName() + " is not online now."; + LOG.warn(msg); + throw new IOException(msg); + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param env MasterProcedureEnv + * @return assignmentManager + */ + private AssignmentManager getAssignmentManager(final MasterProcedureEnv env) { + if (assignmentManager == null) { + assignmentManager = env.getMasterServices().getAssignmentManager(); + } + return assignmentManager; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param env MasterProcedureEnv + * @return timeout value + */ + private int getTimeout(final MasterProcedureEnv env) { + if (timeout == -1) { + timeout = env.getMasterConfiguration().getInt( + "hbase.master.regionmerge.timeout", regionsToMerge.length * 60 * 1000); + } + return timeout; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param env MasterProcedureEnv + * @return serverName + */ + private ServerName getServerName(final MasterProcedureEnv env) { + if (regionLocation == null) { + regionLocation = + getAssignmentManager(env).getRegionStates().getRegionServerOfRegion(regionsToMerge[0]); + } + return regionLocation; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param fullName whether return only encoded name + * @return region names in a list + */ + private String getRegionsToMergeListFullNameString() { + if (regionsToMergeListFullName == null) { + StringBuilder sb = new StringBuilder("["); + int i = 0; + while(i < regionsToMerge.length - 1) { + sb.append(regionsToMerge[i].getRegionNameAsString() + ", "); + i++; + } + sb.append(regionsToMerge[i].getRegionNameAsString() + " ]"); + regionsToMergeListFullName = sb.toString(); + } + return regionsToMergeListFullName; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @return encoded region names + */ + private String getRegionsToMergeListEncodedNameString() { + if (regionsToMergeListEncodedName == null) { + StringBuilder sb = new StringBuilder("["); + int i = 0; + while(i < regionsToMerge.length - 1) { + sb.append(regionsToMerge[i].getEncodedName() + ", "); + i++; + } + sb.append(regionsToMerge[i].getEncodedName() + " ]"); + regionsToMergeListEncodedName = sb.toString(); + } + return regionsToMergeListEncodedName; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @return traceEnabled + */ + private Boolean isTraceEnabled() { + if (traceEnabled == null) { + traceEnabled = LOG.isTraceEnabled(); + } + return traceEnabled; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java index 883ac9a..4730ad8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java @@ -161,9 +161,9 @@ public class SplitTableRegionProcedure break; case SPLIT_TABLE_REGION_SET_SPLITTING_TABLE_STATE: setRegionStateToSplitting(env); - setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CLOSED_PARENT_REGION); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CLOSE_PARENT_REGION); break; - case SPLIT_TABLE_REGION_CLOSED_PARENT_REGION: + case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION: closeParentRegionForSplit(env); setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS); break; @@ -242,14 +242,14 @@ public class SplitTableRegionProcedure case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS: // Doing nothing, as re-open parent region would clean up daughter region directories. break; - case SPLIT_TABLE_REGION_CLOSED_PARENT_REGION: + case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION: openParentRegion(env); break; case SPLIT_TABLE_REGION_SET_SPLITTING_TABLE_STATE: setRegionStateToRevertSplitting(env); break; case SPLIT_TABLE_REGION_PRE_OPERATION: - preSplitRegionRollback(env); + postRollBackSplitRegion(env); break; case SPLIT_TABLE_REGION_PREPARE: break; // nothing to do @@ -408,15 +408,14 @@ public class SplitTableRegionProcedure } /** - * Action during rollback a pre split table region. + * Action after rollback a split table region action. * @param env MasterProcedureEnv - * @param state the procedure state * @throws IOException */ - private void preSplitRegionRollback(final MasterProcedureEnv env) throws IOException { + private void postRollBackSplitRegion(final MasterProcedureEnv env) throws IOException { final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { - cpHost.preRollBackSplitAction(getUser()); + cpHost.postRollBackSplitRegionAction(getUser()); } } @@ -458,14 +457,13 @@ public class SplitTableRegionProcedure } /** - * RPC to region server that host the parent region, ask for close the parent regions and - * creating daughter regions + * RPC to region server that host the parent region, ask for close the parent regions * @param env MasterProcedureEnv * @throws IOException */ @VisibleForTesting public void closeParentRegionForSplit(final MasterProcedureEnv env) throws IOException { - boolean success = env.getMasterServices().getServerManager().sendRegionCloseForSplit( + boolean success = env.getMasterServices().getServerManager().sendRegionCloseForSplitOrMerge( getParentRegionState(env).getServerName(), parentHRI); if (!success) { throw new IOException("Close parent region " + parentHRI + " for splitting failed." http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 50382a4..d4e80c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -647,7 +647,7 @@ public class HRegionFileSystem { // Merge Helpers // =========================================================================== /** @return {@link Path} to the temp directory used during merge operations */ - Path getMergesDir() { + public Path getMergesDir() { return new Path(getRegionDir(), REGION_MERGES_DIR); } @@ -667,7 +667,7 @@ public class HRegionFileSystem { * @param mergedRegion {@link HRegionInfo} * @throws IOException */ - void cleanupMergedRegion(final HRegionInfo mergedRegion) throws IOException { + public void cleanupMergedRegion(final HRegionInfo mergedRegion) throws IOException { Path regionDir = new Path(this.tableDir, mergedRegion.getEncodedName()); if (this.fs.exists(regionDir) && !this.fs.delete(regionDir, true)) { throw new IOException("Failed delete of " + regionDir); @@ -679,7 +679,7 @@ public class HRegionFileSystem { * @throws IOException If merges dir already exists or we fail to create it. * @see HRegionFileSystem#cleanupMergesDir() */ - void createMergesDir() throws IOException { + public void createMergesDir() throws IOException { Path mergesdir = getMergesDir(); if (fs.exists(mergesdir)) { LOG.info("The " + mergesdir @@ -703,7 +703,7 @@ public class HRegionFileSystem { * @return Path to created reference. * @throws IOException */ - Path mergeStoreFile(final HRegionInfo mergedRegion, final String familyName, + public Path mergeStoreFile(final HRegionInfo mergedRegion, final String familyName, final StoreFile f, final Path mergedDir) throws IOException { Path referenceDir = new Path(new Path(mergedDir, @@ -728,7 +728,7 @@ public class HRegionFileSystem { * @param mergedRegionInfo merged region {@link HRegionInfo} * @throws IOException */ - void commitMergedRegion(final HRegionInfo mergedRegionInfo) throws IOException { + public void commitMergedRegion(final HRegionInfo mergedRegionInfo) throws IOException { Path regionDir = new Path(this.tableDir, mergedRegionInfo.getEncodedName()); Path mergedRegionTmpDir = this.getMergesDir(mergedRegionInfo); // Move the tmp dir in the expected location http://git-wip-us.apache.org/repos/asf/hbase/blob/0a240778/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 56fc6eb..3e4a23e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -48,7 +48,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.MalformedObjectNameException; @@ -3026,54 +3025,58 @@ public class HRegionServer extends HasThread implements } /** - * Close and offline the region for split + * Close and offline the region for split or merge * - * @param parentRegionEncodedName the name of the region to close - * @return True if closed the region successfully. + * @param regionEncodedName the name of the region(s) to close + * @return true if closed the region successfully. * @throws IOException */ - protected boolean closeAndOfflineRegionForSplit( - final String parentRegionEncodedName) throws IOException { - Region parentRegion = this.getFromOnlineRegions(parentRegionEncodedName); - if (parentRegion != null) { - Map<byte[], List<StoreFile>> hstoreFilesToSplit = null; - Exception exceptionToThrow = null; - try{ - hstoreFilesToSplit = ((HRegion)parentRegion).close(false); - } catch (Exception e) { - exceptionToThrow = e; - } - if (exceptionToThrow == null && hstoreFilesToSplit == null) { - // The region was closed by someone else - exceptionToThrow = - new IOException("Failed to close region: already closed by another thread"); - } + protected boolean closeAndOfflineRegionForSplitOrMerge( + final List<String> regionEncodedName) throws IOException { + for (int i = 0; i < regionEncodedName.size(); ++i) { + Region regionToClose = this.getFromOnlineRegions(regionEncodedName.get(i)); + if (regionToClose != null) { + Map<byte[], List<StoreFile>> hstoreFiles = null; + Exception exceptionToThrow = null; + try{ + hstoreFiles = ((HRegion)regionToClose).close(false); + } catch (Exception e) { + exceptionToThrow = e; + } + if (exceptionToThrow == null && hstoreFiles == null) { + // The region was closed by someone else + exceptionToThrow = + new IOException("Failed to close region: already closed by another thread"); + } - if (exceptionToThrow != null) { - if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow; - throw new IOException(exceptionToThrow); - } - if (parentRegion.getTableDesc().hasSerialReplicationScope()) { - // For serial replication, we need add a final barrier on this region. But the splitting may - // be reverted, so we should make sure if we reopen this region, the open barrier is same as - // this final barrier - long seq = parentRegion.getMaxFlushedSeqId(); - if (seq == HConstants.NO_SEQNUM) { - // No edits in WAL for this region; get the sequence number when the region was opened. - seq = parentRegion.getOpenSeqNum(); + if (exceptionToThrow != null) { + if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow; + throw new IOException(exceptionToThrow); + } + if (regionToClose.getTableDesc().hasSerialReplicationScope()) { + // For serial replication, we need add a final barrier on this region. But the splitting + // or merging may be reverted, so we should make sure if we reopen this region, the open + // barrier is same as this final barrier + long seq = regionToClose.getMaxFlushedSeqId(); if (seq == HConstants.NO_SEQNUM) { - // This region has no data - seq = 0; + // No edits in WAL for this region; get the sequence number when the region was opened. + seq = regionToClose.getOpenSeqNum(); + if (seq == HConstants.NO_SEQNUM) { + // This region has no data + seq = 0; + } + } else { + seq++; } - } else { - seq++; + Put finalBarrier = MetaTableAccessor.makeBarrierPut( + Bytes.toBytes(regionEncodedName.get(i)), + seq, + regionToClose.getTableDesc().getTableName().getName()); + MetaTableAccessor.putToMetaTable(getConnection(), finalBarrier); } - Put finalBarrier = MetaTableAccessor.makeBarrierPut(Bytes.toBytes(parentRegionEncodedName), - seq, parentRegion.getTableDesc().getTableName().getName()); - MetaTableAccessor.putToMetaTable(getConnection(), finalBarrier); + // Offline the region + this.removeFromOnlineRegions(regionToClose, null); } - // Offline the region - this.removeFromOnlineRegions(parentRegion, null); } return true; }