HBASE-19216 Implement a general framework to execute remote procedure on RS
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3cc7ac6f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3cc7ac6f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3cc7ac6f Branch: refs/heads/HBASE-19397-branch-2 Commit: 3cc7ac6f8d00ca8f8aef3b1f4251d8e9d45368c2 Parents: 814d08a Author: zhangduo <zhang...@apache.org> Authored: Fri Dec 15 21:06:44 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Thu Jan 11 17:11:23 2018 +0800 ---------------------------------------------------------------------- .../hbase/procedure2/LockedResourceType.java | 4 +- .../procedure2/RemoteProcedureDispatcher.java | 23 +- .../src/main/protobuf/Admin.proto | 9 +- .../src/main/protobuf/MasterProcedure.proto | 30 +++ .../src/main/protobuf/RegionServerStatus.proto | 15 ++ .../apache/hadoop/hbase/executor/EventType.java | 26 ++- .../hadoop/hbase/executor/ExecutorType.java | 3 +- .../org/apache/hadoop/hbase/master/HMaster.java | 33 ++- .../hadoop/hbase/master/MasterRpcServices.java | 13 ++ .../assignment/RegionTransitionProcedure.java | 18 +- .../procedure/MasterProcedureScheduler.java | 224 +++++++++++++------ .../procedure/PeerProcedureInterface.java | 34 +++ .../master/procedure/RSProcedureDispatcher.java | 101 +++++---- .../master/replication/ModifyPeerProcedure.java | 127 +++++++++++ .../master/replication/RefreshPeerCallable.java | 67 ++++++ .../replication/RefreshPeerProcedure.java | 197 ++++++++++++++++ .../hbase/procedure2/RSProcedureCallable.java | 43 ++++ .../hbase/regionserver/HRegionServer.java | 90 ++++++-- .../hbase/regionserver/RSRpcServices.java | 56 +++-- .../handler/RSProcedureHandler.java | 51 +++++ .../assignment/TestAssignmentManager.java | 20 +- .../replication/DummyModifyPeerProcedure.java | 41 ++++ .../TestDummyModifyPeerProcedure.java | 80 +++++++ .../security/access/TestAccessController.java | 1 + 24 files changed, 1122 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java index c5fe62b..dc9b5d4 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,5 +22,5 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public enum LockedResourceType { - SERVER, NAMESPACE, TABLE, REGION + SERVER, NAMESPACE, TABLE, REGION, PEER } http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java index 71932b8..78c49fb 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -226,13 +226,30 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable /** * Remote procedure reference. - * @param <TEnv> - * @param <TRemote> */ public interface RemoteProcedure<TEnv, TRemote> { + /** + * For building the remote operation. + */ RemoteOperation remoteCallBuild(TEnv env, TRemote remote); - void remoteCallCompleted(TEnv env, TRemote remote, RemoteOperation response); + + /** + * Called when the executeProcedure call is failed. + */ void remoteCallFailed(TEnv env, TRemote remote, IOException exception); + + /** + * Called when RS tells the remote procedure is succeeded through the + * {@code reportProcedureDone} method. + */ + void remoteOperationCompleted(TEnv env); + + /** + * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone} + * method. + * @param error the error message + */ + void remoteOperationFailed(TEnv env, String error); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-protocol-shaded/src/main/protobuf/Admin.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto index 118c79b..ddcc266 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto @@ -256,14 +256,19 @@ message ClearRegionBlockCacheResponse { required CacheEvictionStats stats = 1; } +message RemoteProcedureRequest { + required uint64 proc_id = 1; + required string proc_class = 2; + optional bytes proc_data = 3; +} + message ExecuteProceduresRequest { repeated OpenRegionRequest open_region = 1; repeated CloseRegionRequest close_region = 2; + repeated RemoteProcedureRequest proc = 3; } message ExecuteProceduresResponse { - repeated OpenRegionResponse open_region = 1; - repeated CloseRegionResponse close_region = 2; } service AdminService { http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index f9b8807..0e2bdba 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -365,3 +365,33 @@ message GCMergedRegionsStateData { required RegionInfo parent_b = 2; required RegionInfo merged_child = 3; } + +enum PeerModificationState { + UPDATE_PEER_STORAGE = 1; + REFRESH_PEER_ON_RS = 2; + POST_PEER_MODIFICATION = 3; +} + +message PeerModificationStateData { + required string peer_id = 1; +} + +enum PeerModificationType { + ADD_PEER = 1; + REMOVE_PEER = 2; + ENABLE_PEER = 3; + DISABLE_PEER = 4; + UPDATE_PEER_CONFIG = 5; +} + +message RefreshPeerStateData { + required string peer_id = 1; + required PeerModificationType type = 2; + required ServerName target_server = 3; +} + +message RefreshPeerParameter { + required string peer_id = 1; + required PeerModificationType type = 2; + required ServerName target_server = 3; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto index f83bb20..eb396ac 100644 --- a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto @@ -143,7 +143,19 @@ message RegionSpaceUseReportRequest { } message RegionSpaceUseReportResponse { +} +message ReportProcedureDoneRequest { + required uint64 proc_id = 1; + enum Status { + SUCCESS = 1; + ERROR = 2; + } + required Status status = 2; + optional string error = 3; +} + +message ReportProcedureDoneResponse { } service RegionServerStatusService { @@ -181,4 +193,7 @@ service RegionServerStatusService { */ rpc ReportRegionSpaceUse(RegionSpaceUseReportRequest) returns(RegionSpaceUseReportResponse); + + rpc ReportProcedureDone(ReportProcedureDoneRequest) + returns(ReportProcedureDoneResponse); } http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index 26fb63a..922deb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -20,15 +20,14 @@ package org.apache.hadoop.hbase.executor; import org.apache.yetus.audience.InterfaceAudience; /** - * List of all HBase event handler types. Event types are named by a - * convention: event type names specify the component from which the event - * originated and then where its destined -- e.g. RS2ZK_ prefix means the - * event came from a regionserver destined for zookeeper -- and then what - * the even is; e.g. REGION_OPENING. - * - * <p>We give the enums indices so we can add types later and keep them - * grouped together rather than have to add them always to the end as we - * would have to if we used raw enum ordinals. + * List of all HBase event handler types. + * <p> + * Event types are named by a convention: event type names specify the component from which the + * event originated and then where its destined -- e.g. RS_ZK_ prefix means the event came from a + * regionserver destined for zookeeper -- and then what the even is; e.g. REGION_OPENING. + * <p> + * We give the enums indices so we can add types later and keep them grouped together rather than + * have to add them always to the end as we would have to if we used raw enum ordinals. */ @InterfaceAudience.Private public enum EventType { @@ -275,7 +274,14 @@ public enum EventType { * * RS_COMPACTED_FILES_DISCHARGER */ - RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER); + RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER), + + /** + * RS refresh peer.<br> + * + * RS_REFRESH_PEER + */ + RS_REFRESH_PEER (84, ExecutorType.RS_REFRESH_PEER); private final int code; private final ExecutorType executor; http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index c75a0a9..7f130d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -46,7 +46,8 @@ public enum ExecutorType { RS_LOG_REPLAY_OPS (27), RS_REGION_REPLICA_FLUSH_OPS (28), RS_COMPACTED_FILES_DISCHARGER (29), - RS_OPEN_PRIORITY_REGION (30); + RS_OPEN_PRIORITY_REGION (30), + RS_REFRESH_PEER (31); ExecutorType(int value) { } http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 945f54d..6f5cdc9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -140,6 +139,7 @@ import org.apache.hadoop.hbase.procedure2.LockedResource; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.quotas.MasterSpaceQuotaObserver; @@ -328,8 +328,7 @@ public class HMaster extends HRegionServer implements MasterServices { // flag set after we become the active master (used for testing) private volatile boolean activeMaster = false; - // flag set after we complete initialization once active, - // it is not private since it's used in unit tests + // flag set after we complete initialization once active private final ProcedureEvent initialized = new ProcedureEvent("master initialized"); // flag set after master services are started, @@ -3525,4 +3524,28 @@ public class HMaster extends HRegionServer implements MasterServices { public SpaceQuotaSnapshotNotifier getSpaceQuotaSnapshotNotifier() { return this.spaceQuotaSnapshotNotifier; } -} + + @SuppressWarnings("unchecked") + private RemoteProcedure<MasterProcedureEnv, ?> getRemoteProcedure(long procId) { + Procedure<?> procedure = procedureExecutor.getProcedure(procId); + if (procedure == null) { + return null; + } + assert procedure instanceof RemoteProcedure; + return (RemoteProcedure<MasterProcedureEnv, ?>) procedure; + } + + public void remoteProcedureCompleted(long procId) { + RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId); + if (procedure != null) { + procedure.remoteOperationCompleted(procedureExecutor.getEnvironment()); + } + } + + public void remoteProcedureFailed(long procId, String error) { + RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId); + if (procedure != null) { + procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 907ca9b..f875e20 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -264,6 +264,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; @@ -2247,4 +2249,15 @@ public class MasterRpcServices extends RSRpcServices } return response.build(); } + + @Override + public ReportProcedureDoneResponse reportProcedureDone(RpcController controller, + ReportProcedureDoneRequest request) throws ServiceException { + if (request.getStatus() == ReportProcedureDoneRequest.Status.SUCCESS) { + master.remoteProcedureCompleted(request.getProcId()); + } else { + master.remoteProcedureFailed(request.getProcId(), request.getError()); + } + return ReportProcedureDoneResponse.getDefaultInstance(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java index 4a88e3b..04dccc4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java @@ -173,12 +173,6 @@ public abstract class RegionTransitionProcedure RegionStateNode regionNode, IOException exception); @Override - public void remoteCallCompleted(final MasterProcedureEnv env, - final ServerName serverName, final RemoteOperation response) { - // Ignore the response? reportTransition() is the one that count? - } - - @Override public void remoteCallFailed(final MasterProcedureEnv env, final ServerName serverName, final IOException exception) { final RegionStateNode regionNode = getRegionState(env); @@ -413,4 +407,16 @@ public abstract class RegionTransitionProcedure * @return ServerName the Assign or Unassign is going against. */ public abstract ServerName getServer(final MasterProcedureEnv env); + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + // should not be called for region operation until we modified the open/close region procedure + throw new UnsupportedOperationException(); + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, String error) { + // should not be called for region operation until we modified the open/close region procedure + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index dc9c69d..8ff2d12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -24,7 +24,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.master.locking.LockProcedure; +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; import org.apache.hadoop.hbase.procedure2.LockAndQueue; @@ -109,12 +110,17 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { new ServerQueueKeyComparator(); private final static TableQueueKeyComparator TABLE_QUEUE_KEY_COMPARATOR = new TableQueueKeyComparator(); + private final static PeerQueueKeyComparator PEER_QUEUE_KEY_COMPARATOR = + new PeerQueueKeyComparator(); private final FairQueue<ServerName> serverRunQueue = new FairQueue<>(); private final FairQueue<TableName> tableRunQueue = new FairQueue<>(); + private final FairQueue<String> peerRunQueue = new FairQueue<>(); private final ServerQueue[] serverBuckets = new ServerQueue[128]; private TableQueue tableMap = null; + private PeerQueue peerMap = null; + private final SchemaLocking locking = new SchemaLocking(); /** @@ -161,6 +167,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); } else if (isServerProcedure(proc)) { doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront); + } else if (isPeerProcedure(proc)) { + doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); } else { // TODO: at the moment we only have Table and Server procedures // if you are implementing a non-table/non-server procedure, you have two options: create @@ -172,7 +180,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq, - final Queue<T> queue, final Procedure proc, final boolean addFront) { + final Queue<T> queue, final Procedure<?> proc, final boolean addFront) { queue.add(proc, addFront); if (!queue.getLockStatus().hasExclusiveLock() || queue.getLockStatus().isLockOwner(proc.getProcId())) { // if the queue was not remove for an xlock execution @@ -189,7 +197,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { @Override protected boolean queueHasRunnables() { - return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables(); + return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables() || + peerRunQueue.hasRunnables(); } @Override @@ -197,7 +206,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // For now, let server handling have precedence over table handling; presumption is that it // is more important handling crashed servers than it is running the // enabling/disabling tables, etc. - Procedure pollResult = doPoll(serverRunQueue); + Procedure<?> pollResult = doPoll(serverRunQueue); + if (pollResult == null) { + pollResult = doPoll(peerRunQueue); + } if (pollResult == null) { pollResult = doPoll(tableRunQueue); } @@ -267,60 +279,30 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { exclusiveLockOwnerProcedure, sharedLockCount, waitingProcedures); } + private <T> void addToLockedResources(List<LockedResource> lockedResources, + Map<T, LockAndQueue> locks, Function<T, String> keyTransformer, + LockedResourceType resourcesType) { + locks.entrySet().stream().filter(e -> e.getValue().isLocked()) + .map( + e -> createLockedResource(resourcesType, keyTransformer.apply(e.getKey()), e.getValue())) + .forEachOrdered(lockedResources::add); + } + @Override public List<LockedResource> getLocks() { schedLock(); - try { List<LockedResource> lockedResources = new ArrayList<>(); - - for (Entry<ServerName, LockAndQueue> entry : locking.serverLocks - .entrySet()) { - String serverName = entry.getKey().getServerName(); - LockAndQueue queue = entry.getValue(); - - if (queue.isLocked()) { - LockedResource lockedResource = - createLockedResource(LockedResourceType.SERVER, serverName, queue); - lockedResources.add(lockedResource); - } - } - - for (Entry<String, LockAndQueue> entry : locking.namespaceLocks - .entrySet()) { - String namespaceName = entry.getKey(); - LockAndQueue queue = entry.getValue(); - - if (queue.isLocked()) { - LockedResource lockedResource = - createLockedResource(LockedResourceType.NAMESPACE, namespaceName, queue); - lockedResources.add(lockedResource); - } - } - - for (Entry<TableName, LockAndQueue> entry : locking.tableLocks - .entrySet()) { - String tableName = entry.getKey().getNameAsString(); - LockAndQueue queue = entry.getValue(); - - if (queue.isLocked()) { - LockedResource lockedResource = - createLockedResource(LockedResourceType.TABLE, tableName, queue); - lockedResources.add(lockedResource); - } - } - - for (Entry<String, LockAndQueue> entry : locking.regionLocks.entrySet()) { - String regionName = entry.getKey(); - LockAndQueue queue = entry.getValue(); - - if (queue.isLocked()) { - LockedResource lockedResource = - createLockedResource(LockedResourceType.REGION, regionName, queue); - lockedResources.add(lockedResource); - } - } - + addToLockedResources(lockedResources, locking.serverLocks, sn -> sn.getServerName(), + LockedResourceType.SERVER); + addToLockedResources(lockedResources, locking.namespaceLocks, Function.identity(), + LockedResourceType.NAMESPACE); + addToLockedResources(lockedResources, locking.tableLocks, tn -> tn.getNameAsString(), + LockedResourceType.TABLE); + addToLockedResources(lockedResources, locking.regionLocks, Function.identity(), + LockedResourceType.REGION); + addToLockedResources(lockedResources, locking.peerLocks, Function.identity(), + LockedResourceType.PEER); return lockedResources; } finally { schedUnlock(); @@ -328,8 +310,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } @Override - public LockedResource getLockResource(LockedResourceType resourceType, - String resourceName) { + public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) { LockAndQueue queue = null; schedLock(); try { @@ -346,8 +327,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { case REGION: queue = locking.regionLocks.get(resourceName); break; + case PEER: + queue = locking.peerLocks.get(resourceName); + break; } - return queue != null ? createLockedResource(resourceType, resourceName, queue) : null; } finally { schedUnlock(); @@ -431,6 +414,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { markTableAsDeleted(iProcTable.getTableName(), proc); return; } + } else if (proc instanceof PeerProcedureInterface) { + PeerProcedureInterface iProcPeer = (PeerProcedureInterface) proc; + if (iProcPeer.getPeerOperationType() == PeerOperationType.REMOVE) { + removePeerQueue(iProcPeer.getPeerId()); + } } else { // No cleanup for ServerProcedureInterface types, yet. return; @@ -468,12 +456,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { locking.removeTableLock(tableName); } - - private static boolean isTableProcedure(Procedure proc) { + private static boolean isTableProcedure(Procedure<?> proc) { return proc instanceof TableProcedureInterface; } - private static TableName getTableName(Procedure proc) { + private static TableName getTableName(Procedure<?> proc) { return ((TableProcedureInterface)proc).getTableName(); } @@ -494,15 +481,42 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return Math.abs(hashCode) % buckets.length; } - private static boolean isServerProcedure(Procedure proc) { + private static boolean isServerProcedure(Procedure<?> proc) { return proc instanceof ServerProcedureInterface; } - private static ServerName getServerName(Procedure proc) { + private static ServerName getServerName(Procedure<?> proc) { return ((ServerProcedureInterface)proc).getServerName(); } // ============================================================================ + // Peer Queue Lookup Helpers + // ============================================================================ + private PeerQueue getPeerQueue(String peerId) { + PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); + if (node != null) { + return node; + } + node = new PeerQueue(peerId, locking.getPeerLock(peerId)); + peerMap = AvlTree.insert(peerMap, node); + return node; + } + + private void removePeerQueue(String peerId) { + peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); + locking.removePeerLock(peerId); + } + + + private static boolean isPeerProcedure(Procedure<?> proc) { + return proc instanceof PeerProcedureInterface; + } + + private static String getPeerId(Procedure<?> proc) { + return ((PeerProcedureInterface) proc).getPeerId(); + } + + // ============================================================================ // Table and Server Queue Implementation // ============================================================================ private static class ServerQueueKeyComparator implements AvlKeyComparator<ServerQueue> { @@ -571,6 +585,26 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } + private static class PeerQueueKeyComparator implements AvlKeyComparator<PeerQueue> { + + @Override + public int compareKey(PeerQueue node, Object key) { + return node.compareKey((String) key); + } + } + + public static class PeerQueue extends Queue<String> { + + public PeerQueue(String peerId, LockStatus lockStatus) { + super(peerId, lockStatus); + } + + @Override + public boolean requireExclusiveLock(Procedure proc) { + return requirePeerExclusiveLock((PeerProcedureInterface) proc); + } + } + // ============================================================================ // Table Locking Helpers // ============================================================================ @@ -958,7 +992,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param serverName Server to lock * @return true if the procedure has to wait for the server to be available */ - public boolean waitServerExclusiveLock(final Procedure procedure, final ServerName serverName) { + public boolean waitServerExclusiveLock(final Procedure<?> procedure, + final ServerName serverName) { schedLock(); try { final LockAndQueue lock = locking.getServerLock(serverName); @@ -980,7 +1015,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param procedure the procedure releasing the lock * @param serverName the server that has the exclusive lock */ - public void wakeServerExclusiveLock(final Procedure procedure, final ServerName serverName) { + public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) { schedLock(); try { final LockAndQueue lock = locking.getServerLock(serverName); @@ -994,6 +1029,56 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } // ============================================================================ + // Peer Locking Helpers + // ============================================================================ + + private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) { + return proc.getPeerOperationType() != PeerOperationType.REFRESH; + } + + /** + * Try to acquire the exclusive lock on the specified peer. + * @see #wakePeerExclusiveLock(Procedure, String) + * @param procedure the procedure trying to acquire the lock + * @param peerId peer to lock + * @return true if the procedure has to wait for the per to be available + */ + public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) { + schedLock(); + try { + final LockAndQueue lock = locking.getPeerLock(peerId); + if (lock.tryExclusiveLock(procedure)) { + removeFromRunQueue(peerRunQueue, getPeerQueue(peerId)); + return false; + } + waitProcedure(lock, procedure); + logLockedResource(LockedResourceType.PEER, peerId); + return true; + } finally { + schedUnlock(); + } + } + + /** + * Wake the procedures waiting for the specified peer + * @see #waitPeerExclusiveLock(Procedure, String) + * @param procedure the procedure releasing the lock + * @param peerId the peer that has the exclusive lock + */ + public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) { + schedLock(); + try { + final LockAndQueue lock = locking.getPeerLock(peerId); + lock.releaseExclusiveLock(procedure); + addToRunQueue(peerRunQueue, getPeerQueue(peerId)); + int waitingCount = wakeWaitingProcedures(lock); + wakePollIfNeeded(waitingCount); + } finally { + schedUnlock(); + } + } + + // ============================================================================ // Generic Helpers // ============================================================================ private static abstract class Queue<TKey extends Comparable<TKey>> @@ -1098,6 +1183,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final Map<TableName, LockAndQueue> tableLocks = new HashMap<>(); // Single map for all regions irrespective of tables. Key is encoded region name. final Map<String, LockAndQueue> regionLocks = new HashMap<>(); + final Map<String, LockAndQueue> peerLocks = new HashMap<>(); private <T> LockAndQueue getLock(Map<T, LockAndQueue> map, T key) { LockAndQueue lock = map.get(key); @@ -1132,6 +1218,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return getLock(serverLocks, serverName); } + LockAndQueue getPeerLock(String peerId) { + return getLock(peerLocks, peerId); + } + + LockAndQueue removePeerLock(String peerId) { + return peerLocks.remove(peerId); + } + /** * Removes all locks by clearing the maps. * Used when procedure executor is stopped for failure and recovery testing. @@ -1142,6 +1236,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { namespaceLocks.clear(); tableLocks.clear(); regionLocks.clear(); + peerLocks.clear(); } @Override @@ -1149,7 +1244,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return "serverLocks=" + filterUnlocked(this.serverLocks) + ", namespaceLocks=" + filterUnlocked(this.namespaceLocks) + ", tableLocks=" + filterUnlocked(this.tableLocks) + - ", regionLocks=" + filterUnlocked(this.regionLocks); + ", regionLocks=" + filterUnlocked(this.regionLocks) + + ", peerLocks=" + filterUnlocked(this.peerLocks); } private String filterUnlocked(Map<?, LockAndQueue> locks) { http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java new file mode 100644 index 0000000..4abc9ad --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface PeerProcedureInterface { + + enum PeerOperationType { + ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH + } + + String getPeerId(); + + PeerOperationType getPeerOperationType(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java index 65c4d08..0f68f31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; - import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; @@ -36,10 +35,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; @@ -49,6 +45,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProc import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; + +import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; /** * A remote procecdure dispatcher for regionservers. @@ -222,7 +225,10 @@ public class RSProcedureDispatcher private interface RemoteProcedureResolver { void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations); + void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations); + + void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations); } /** @@ -231,22 +237,28 @@ public class RSProcedureDispatcher * Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and * {@link RegionCloseOperation}s. * @param serverName RegionServer to which the remote operations are sent - * @param remoteProcedures Remote procedures which are dispatched to the given server + * @param operations Remote procedures which are dispatched to the given server * @param resolver Used to dispatch remote procedures to given server. */ - public void splitAndResolveOperation(final ServerName serverName, - final Set<RemoteProcedure> remoteProcedures, final RemoteProcedureResolver resolver) { - final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType = - buildAndGroupRequestByType(procedureEnv, serverName, remoteProcedures); + public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedure> operations, + RemoteProcedureResolver resolver) { + MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); + ArrayListMultimap<Class<?>, RemoteOperation> reqsByType = + buildAndGroupRequestByType(env, serverName, operations); - final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class); + List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class); if (!openOps.isEmpty()) { - resolver.dispatchOpenRequests(procedureEnv, openOps); + resolver.dispatchOpenRequests(env, openOps); } - final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class); + List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class); if (!closeOps.isEmpty()) { - resolver.dispatchCloseRequests(procedureEnv, closeOps); + resolver.dispatchCloseRequests(env, closeOps); + } + + List<ServerOperation> refreshOps = fetchType(reqsByType, ServerOperation.class); + if (!refreshOps.isEmpty()) { + resolver.dispatchServerOperations(env, refreshOps); } if (!reqsByType.isEmpty()) { @@ -277,8 +289,7 @@ public class RSProcedureDispatcher splitAndResolveOperation(getServerName(), remoteProcedures, this); try { - final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build()); - remoteCallCompleted(procedureEnv, response); + sendRequest(getServerName(), request.build()); } catch (IOException e) { e = unwrapException(e); // TODO: In the future some operation may want to bail out early. @@ -302,6 +313,11 @@ public class RSProcedureDispatcher } } + @Override + public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) { + operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc); + } + protected ExecuteProceduresResponse sendRequest(final ServerName serverName, final ExecuteProceduresRequest request) throws IOException { try { @@ -311,17 +327,8 @@ public class RSProcedureDispatcher } } - - private void remoteCallCompleted(final MasterProcedureEnv env, - final ExecuteProceduresResponse response) { - /* - for (RemoteProcedure proc: operations) { - proc.remoteCallCompleted(env, getServerName(), response); - }*/ - } - private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { - for (RemoteProcedure proc: remoteProcedures) { + for (RemoteProcedure proc : remoteProcedures) { proc.remoteCallFailed(env, getServerName(), e); } } @@ -362,8 +369,7 @@ public class RSProcedureDispatcher buildOpenRegionRequest(procedureEnv, getServerName(), operations); try { - OpenRegionResponse response = sendRequest(getServerName(), request); - remoteCallCompleted(procedureEnv, response); + sendRequest(getServerName(), request); } catch (IOException e) { e = unwrapException(e); // TODO: In the future some operation may want to bail out early. @@ -384,16 +390,6 @@ public class RSProcedureDispatcher } } - private void remoteCallCompleted(final MasterProcedureEnv env, - final OpenRegionResponse response) { - int index = 0; - for (RegionOpenOperation op: operations) { - OpenRegionResponse.RegionOpeningState state = response.getOpeningState(index++); - op.setFailedOpen(state == OpenRegionResponse.RegionOpeningState.FAILED_OPENING); - op.getRemoteProcedure().remoteCallCompleted(env, getServerName(), op); - } - } - private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { for (RegionOpenOperation op: operations) { op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e); @@ -443,7 +439,6 @@ public class RSProcedureDispatcher private void remoteCallCompleted(final MasterProcedureEnv env, final CloseRegionResponse response) { operation.setClosed(response.getClosed()); - operation.getRemoteProcedure().remoteCallCompleted(env, getServerName(), operation); } private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { @@ -482,6 +477,11 @@ public class RSProcedureDispatcher submitTask(new CloseRegionRemoteCall(serverName, op)); } } + + @Override + public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) { + throw new UnsupportedOperationException(); + } } // ========================================================================== @@ -489,13 +489,28 @@ public class RSProcedureDispatcher // - ServerOperation: refreshConfig, grant, revoke, ... (TODO) // - RegionOperation: open, close, flush, snapshot, ... // ========================================================================== - /* Currently unused - public static abstract class ServerOperation extends RemoteOperation { - protected ServerOperation(final RemoteProcedure remoteProcedure) { + + public static final class ServerOperation extends RemoteOperation { + + private final long procId; + + private final Class<?> rsProcClass; + + private final byte[] rsProcData; + + public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass, + byte[] rsProcData) { super(remoteProcedure); + this.procId = procId; + this.rsProcClass = rsProcClass; + this.rsProcData = rsProcData; + } + + public RemoteProcedureRequest buildRequest() { + return RemoteProcedureRequest.newBuilder().setProcId(procId) + .setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build(); } } - */ public static abstract class RegionOperation extends RemoteOperation { private final RegionInfo regionInfo; http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java new file mode 100644 index 0000000..fca05a7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; + +@InterfaceAudience.Private +public abstract class ModifyPeerProcedure + extends StateMachineProcedure<MasterProcedureEnv, PeerModificationState> + implements PeerProcedureInterface { + + private static final Log LOG = LogFactory.getLog(ModifyPeerProcedure.class); + + protected String peerId; + + protected ModifyPeerProcedure() { + } + + protected ModifyPeerProcedure(String peerId) { + this.peerId = peerId; + } + + @Override + public String getPeerId() { + return peerId; + } + + /** + * Return {@code false} means that the operation is invalid and we should give up, otherwise + * {@code true}. + * <p> + * You need to call {@link #setFailure(String, Throwable)} to give the detail failure information. + */ + protected abstract boolean updatePeerStorage() throws IOException; + + protected void postPeerModification() { + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + switch (state) { + case UPDATE_PEER_STORAGE: + try { + if (!updatePeerStorage()) { + assert isFailed() : "setFailure is not called"; + return Flow.NO_MORE_STATE; + } + } catch (IOException e) { + LOG.warn("update peer storage failed, retry", e); + throw new ProcedureYieldException(); + } + setNextState(PeerModificationState.REFRESH_PEER_ON_RS); + return Flow.HAS_MORE_STATE; + case REFRESH_PEER_ON_RS: + addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() + .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn)) + .toArray(RefreshPeerProcedure[]::new)); + setNextState(PeerModificationState.POST_PEER_MODIFICATION); + return Flow.HAS_MORE_STATE; + case POST_PEER_MODIFICATION: + postPeerModification(); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + @Override + protected LockState acquireLock(MasterProcedureEnv env) { + return env.getProcedureScheduler().waitPeerExclusiveLock(this, peerId) + ? LockState.LOCK_EVENT_WAIT + : LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(MasterProcedureEnv env) { + env.getProcedureScheduler().wakePeerExclusiveLock(this, peerId); + } + + @Override + protected void rollbackState(MasterProcedureEnv env, PeerModificationState state) + throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected PeerModificationState getState(int stateId) { + return PeerModificationState.forNumber(stateId); + } + + @Override + protected int getStateId(PeerModificationState state) { + return state.getNumber(); + } + + @Override + protected PeerModificationState getInitialState() { + return PeerModificationState.UPDATE_PEER_STORAGE; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java new file mode 100644 index 0000000..4e09107 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter; + +/** + * The callable executed at RS side to refresh the peer config/state. + * <p> + * TODO: only a dummy implementation for verifying the framework, will add implementation later. + */ +@InterfaceAudience.Private +public class RefreshPeerCallable implements RSProcedureCallable { + + private HRegionServer rs; + + private String peerId; + + private Exception initError; + + @Override + public Void call() throws Exception { + if (initError != null) { + throw initError; + } + rs.getFileSystem().create(new Path("/" + peerId + "/" + rs.getServerName().toString())).close(); + return null; + } + + @Override + public void init(byte[] parameter, HRegionServer rs) { + this.rs = rs; + try { + this.peerId = RefreshPeerParameter.parseFrom(parameter).getPeerId(); + } catch (InvalidProtocolBufferException e) { + initError = e; + return; + } + } + + @Override + public EventType getEventType() { + return EventType.RS_REFRESH_PEER; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java new file mode 100644 index 0000000..18da487 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; +import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData; + +@InterfaceAudience.Private +public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv> + implements PeerProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> { + + private static final Log LOG = LogFactory.getLog(RefreshPeerProcedure.class); + + private String peerId; + + private PeerOperationType type; + + private ServerName targetServer; + + private boolean dispatched; + + private ProcedureEvent<?> event; + + private boolean succ; + + public RefreshPeerProcedure() { + } + + public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer) { + this.peerId = peerId; + this.type = type; + this.targetServer = targetServer; + } + + @Override + public String getPeerId() { + return peerId; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.REFRESH; + } + + private static PeerModificationType toPeerModificationType(PeerOperationType type) { + switch (type) { + case ADD: + return PeerModificationType.ADD_PEER; + case REMOVE: + return PeerModificationType.REMOVE_PEER; + case ENABLE: + return PeerModificationType.ENABLE_PEER; + case DISABLE: + return PeerModificationType.DISABLE_PEER; + case UPDATE_CONFIG: + return PeerModificationType.UPDATE_PEER_CONFIG; + default: + throw new IllegalArgumentException("Unknown type: " + type); + } + } + + private static PeerOperationType toPeerOperationType(PeerModificationType type) { + switch (type) { + case ADD_PEER: + return PeerOperationType.ADD; + case REMOVE_PEER: + return PeerOperationType.REMOVE; + case ENABLE_PEER: + return PeerOperationType.ENABLE; + case DISABLE_PEER: + return PeerOperationType.DISABLE; + case UPDATE_PEER_CONFIG: + return PeerOperationType.UPDATE_CONFIG; + default: + throw new IllegalArgumentException("Unknown type: " + type); + } + } + + @Override + public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) { + assert targetServer.equals(remote); + return new ServerOperation(this, getProcId(), RefreshPeerCallable.class, + RefreshPeerParameter.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type)) + .setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray()); + } + + private void complete(MasterProcedureEnv env, boolean succ) { + if (event == null) { + LOG.warn("procedure event for " + getProcId() + + " is null, maybe the procedure is created when recovery", new Exception()); + return; + } + LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer + + (succ ? " suceeded" : " failed")); + this.succ = succ; + event.wake(env.getProcedureScheduler()); + event = null; + } + + @Override + public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName remote, + IOException exception) { + complete(env, false); + } + + @Override + public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, true); + } + + @Override + public synchronized void remoteOperationFailed(MasterProcedureEnv env, String error) { + complete(env, false); + } + + @Override + protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + // retry + dispatched = false; + } + if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) { + LOG.info("Can not add remote operation for refreshing peer " + peerId + " for " + type + + " to " + targetServer + ", this usually because the server is already dead," + + " give up and mark the procedure as complete"); + return null; + } + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + // TODO: no correctness problem if we just ignore this, implement later. + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + serializer.serialize( + RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type)) + .setTargetServer(ProtobufUtil.toServerName(targetServer)).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + RefreshPeerStateData data = serializer.deserialize(RefreshPeerStateData.class); + peerId = data.getPeerId(); + type = toPeerOperationType(data.getType()); + targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java new file mode 100644 index 0000000..62c2e36 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A general interface for a sub procedure runs at RS side. + */ +@InterfaceAudience.Private +public interface RSProcedureCallable extends Callable<Void> { + + /** + * Initialize the callable + * @param parameter the parameter passed from master. + * @param rs the regionserver instance + */ + void init(byte[] parameter, HRegionServer rs); + + /** + * Event type used to select thread pool. + */ + EventType getEventType(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 53390bd..fc316b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.servlet.http.HttpServlet; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.MemoryType; @@ -50,7 +47,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; - +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.servlet.http.HttpServlet; import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.SystemUtils; import org.apache.hadoop.conf.Configuration; @@ -117,6 +116,7 @@ import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.mob.MobCacheConfig; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; import org.apache.hadoop.hbase.quotas.QuotaUtil; import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; @@ -127,6 +127,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; +import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; @@ -173,14 +174,9 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.collect.Maps; -import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; -import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; -import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; +import sun.misc.Signal; +import sun.misc.SignalHandler; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; @@ -206,12 +202,20 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; -import sun.misc.Signal; -import sun.misc.SignalHandler; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; /** * HRegionServer makes a set of HRegions available to clients. It checks in with @@ -1932,6 +1936,8 @@ public class HRegionServer extends HasThread implements conf.getInt("hbase.regionserver.region.replica.flusher.threads", conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); } + this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER, + conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2)); Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); @@ -3713,4 +3719,60 @@ public class HRegionServer extends HasThread implements return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName, this.rpcServices, this.rpcServices); } + + public void executeProcedure(long procId, RSProcedureCallable callable) { + executorService.submit(new RSProcedureHandler(this, procId, callable)); + } + + public void reportProcedureDone(long procId, Throwable error) { + ReportProcedureDoneRequest.Builder builder = + ReportProcedureDoneRequest.newBuilder().setProcId(procId); + if (error != null) { + builder.setStatus(ReportProcedureDoneRequest.Status.ERROR) + .setError(Throwables.getStackTraceAsString(error)); + } else { + builder.setStatus(ReportProcedureDoneRequest.Status.SUCCESS); + } + ReportProcedureDoneRequest request = builder.build(); + int tries = 0; + long pauseTime = INIT_PAUSE_TIME_MS; + while (keepLooping()) { + RegionServerStatusService.BlockingInterface rss = rssStub; + try { + if (rss == null) { + createRegionServerStatusStub(); + continue; + } + rss.reportProcedureDone(null, request); + // Log if we had to retry else don't log unless TRACE. We want to + // know if were successful after an attempt showed in logs as failed. + if (tries > 0 || LOG.isTraceEnabled()) { + LOG.info("PROCEDURE REPORTED " + request); + } + return; + } catch (ServiceException se) { + IOException ioe = ProtobufUtil.getRemoteException(se); + boolean pause = + ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException; + if (pause) { + // Do backoff else we flood the Master with requests. + pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries); + } else { + pauseTime = INIT_PAUSE_TIME_MS; // Reset. + } + LOG.info( + "Failed to report transition " + TextFormat.shortDebugString(request) + "; retry (#" + + tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is coming online...)." + : " immediately."), + ioe); + if (pause) { + Threads.sleep(pauseTime); + } + tries++; + if (rssStub == rss) { + rssStub = null; + } + } + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index b6c0ebe..e88f70e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -100,6 +99,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.master.MasterRpcServices; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; import org.apache.hadoop.hbase.quotas.OperationQuota; import org.apache.hadoop.hbase.quotas.QuotaUtil; @@ -172,6 +172,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionR import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; @@ -3435,23 +3436,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } @Override - public ExecuteProceduresResponse executeProcedures(RpcController controller, - ExecuteProceduresRequest request) throws ServiceException { - ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder(); - if (request.getOpenRegionCount() > 0) { - for (OpenRegionRequest req : request.getOpenRegionList()) { - builder.addOpenRegion(openRegion(controller, req)); - } - } - if (request.getCloseRegionCount() > 0) { - for (CloseRegionRequest req : request.getCloseRegionList()) { - builder.addCloseRegion(closeRegion(controller, req)); - } - } - return builder.build(); - } - - @Override public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, ClearRegionBlockCacheRequest request) { ClearRegionBlockCacheResponse.Builder builder = @@ -3468,4 +3452,38 @@ public class RSRpcServices implements HBaseRPCErrorHandler, stats.withMaxCacheSize(regionServer.getCacheConfig().getBlockCache().getMaxSize()); return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); } + + @Override + public ExecuteProceduresResponse executeProcedures(RpcController controller, + ExecuteProceduresRequest request) throws ServiceException { + if (request.getOpenRegionCount() > 0) { + for (OpenRegionRequest req : request.getOpenRegionList()) { + openRegion(controller, req); + } + } + if (request.getCloseRegionCount() > 0) { + for (CloseRegionRequest req : request.getCloseRegionList()) { + closeRegion(controller, req); + } + } + if (request.getProcCount() > 0) { + for (RemoteProcedureRequest req : request.getProcList()) { + RSProcedureCallable callable; + try { + callable = + Class.forName(req.getProcClass()).asSubclass(RSProcedureCallable.class).newInstance(); + } catch (Exception e) { + // here we just ignore the error as this should not happen and we do not provide a general + // way to report errors for all types of remote procedure. The procedure will hang at + // master side but after you solve the problem and restart master it will be executed + // again and pass. + LOG.warn("create procedure of type " + req.getProcClass() + " failed, give up", e); + continue; + } + callable.init(req.getProcData().toByteArray(), regionServer); + regionServer.executeProcedure(req.getProcId(), callable); + } + } + return ExecuteProceduresResponse.getDefaultInstance(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java new file mode 100644 index 0000000..94bcfec --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.handler; + +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A event handler for running procedure. + */ +@InterfaceAudience.Private +public class RSProcedureHandler extends EventHandler { + + private final long procId; + + private final RSProcedureCallable callable; + + public RSProcedureHandler(HRegionServer rs, long procId, RSProcedureCallable callable) { + super(rs, callable.getEventType()); + this.procId = procId; + this.callable = callable; + } + + @Override + public void process() { + Exception error = null; + try { + callable.call(); + } catch (Exception e) { + error = e; + } + ((HRegionServer) server).reportProcedureDone(procId, error); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java index 3c453bc..3ab915b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java @@ -531,26 +531,16 @@ public class TestAssignmentManager { @Override public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest request) throws IOException { - ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder(); if (request.getOpenRegionCount() > 0) { - for (OpenRegionRequest req: request.getOpenRegionList()) { - OpenRegionResponse.Builder resp = OpenRegionResponse.newBuilder(); - for (RegionOpenInfo openReq: req.getOpenInfoList()) { - RegionOpeningState state = execOpenRegion(server, openReq); - if (state != null) { - resp.addOpeningState(state); - } + for (OpenRegionRequest req : request.getOpenRegionList()) { + for (RegionOpenInfo openReq : req.getOpenInfoList()) { + execOpenRegion(server, openReq); } - builder.addOpenRegion(resp.build()); } } if (request.getCloseRegionCount() > 0) { - for (CloseRegionRequest req: request.getCloseRegionList()) { - CloseRegionResponse resp = execCloseRegion(server, - req.getRegion().getValue().toByteArray()); - if (resp != null) { - builder.addCloseRegion(resp); - } + for (CloseRegionRequest req : request.getCloseRegionList()) { + execCloseRegion(server, req.getRegion().getValue().toByteArray()); } } return ExecuteProceduresResponse.newBuilder().build(); http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java new file mode 100644 index 0000000..44343d7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +public class DummyModifyPeerProcedure extends ModifyPeerProcedure { + + public DummyModifyPeerProcedure() { + } + + public DummyModifyPeerProcedure(String peerId) { + super(peerId); + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.ADD; + } + + @Override + protected boolean updatePeerStorage() throws IOException { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java new file mode 100644 index 0000000..ec06306 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import static org.junit.Assert.assertTrue; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestDummyModifyPeerProcedure { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static String PEER_ID; + + private static Path DIR; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(3); + PEER_ID = "testPeer"; + DIR = new Path("/" + PEER_ID); + UTIL.getTestFileSystem().mkdirs(DIR); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + ProcedureExecutor<?> executor = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + long procId = executor.submitProcedure(new DummyModifyPeerProcedure(PEER_ID)); + UTIL.waitFor(30000, new Waiter.Predicate<Exception>() { + + @Override + public boolean evaluate() throws Exception { + return executor.isFinished(procId); + } + }); + Set<String> serverNames = UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(t -> t.getRegionServer().getServerName().toString()) + .collect(Collectors.toCollection(HashSet::new)); + for (FileStatus s : UTIL.getTestFileSystem().listStatus(DIR)) { + assertTrue(serverNames.remove(s.getPath().getName())); + } + assertTrue(serverNames.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc7ac6f/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index be1b0e4..99e212d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -30,6 +30,7 @@ import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; import com.google.protobuf.ServiceException; + import java.io.IOException; import java.security.PrivilegedAction; import java.util.ArrayList;