HBASE-19524 Master side changes for moving peer modification from zk watcher to procedure
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ab7a94e4 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ab7a94e4 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ab7a94e4 Branch: refs/heads/HBASE-19397-branch-2 Commit: ab7a94e4a1e1bcf71032f1c34692becae3c7d2f3 Parents: e9a5e66 Author: zhangduo <zhang...@apache.org> Authored: Mon Dec 18 15:22:36 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri Jan 12 09:58:05 2018 +0800 ---------------------------------------------------------------------- .../procedure2/RemoteProcedureDispatcher.java | 3 +- .../src/main/protobuf/MasterProcedure.proto | 21 +++- .../src/main/protobuf/RegionServerStatus.proto | 3 +- .../src/main/protobuf/Replication.proto | 5 + .../replication/ReplicationPeersZKImpl.java | 4 +- .../org/apache/hadoop/hbase/master/HMaster.java | 100 ++++++++----------- .../hadoop/hbase/master/MasterRpcServices.java | 4 +- .../hadoop/hbase/master/MasterServices.java | 26 +++-- .../assignment/RegionTransitionProcedure.java | 13 +-- .../master/procedure/MasterProcedureEnv.java | 5 + .../master/procedure/ProcedurePrepareLatch.java | 2 +- .../master/replication/AddPeerProcedure.java | 97 ++++++++++++++++++ .../replication/DisablePeerProcedure.java | 70 +++++++++++++ .../master/replication/EnablePeerProcedure.java | 69 +++++++++++++ .../master/replication/ModifyPeerProcedure.java | 97 +++++++++++++++--- .../master/replication/RefreshPeerCallable.java | 67 ------------- .../replication/RefreshPeerProcedure.java | 28 ++++-- .../master/replication/RemovePeerProcedure.java | 69 +++++++++++++ .../master/replication/ReplicationManager.java | 76 +++++++------- .../replication/UpdatePeerConfigProcedure.java | 92 +++++++++++++++++ .../hbase/regionserver/HRegionServer.java | 6 +- .../regionserver/RefreshPeerCallable.java | 70 +++++++++++++ .../hbase/master/MockNoopMasterServices.java | 23 +++-- .../replication/DummyModifyPeerProcedure.java | 13 ++- 24 files changed, 737 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java index 78c49fb..3b925a6 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -247,9 +247,8 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable /** * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone} * method. - * @param error the error message */ - void remoteOperationFailed(TEnv env, String error); + void remoteOperationFailed(TEnv env, RemoteProcedureException error); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 0e2bdba..ae676ea 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -27,6 +27,7 @@ option optimize_for = SPEED; import "HBase.proto"; import "RPC.proto"; import "Snapshot.proto"; +import "Replication.proto"; // ============================================================================ // WARNING - Compatibility rules @@ -367,9 +368,10 @@ message GCMergedRegionsStateData { } enum PeerModificationState { - UPDATE_PEER_STORAGE = 1; - REFRESH_PEER_ON_RS = 2; - POST_PEER_MODIFICATION = 3; + PRE_PEER_MODIFICATION = 1; + UPDATE_PEER_STORAGE = 2; + REFRESH_PEER_ON_RS = 3; + POST_PEER_MODIFICATION = 4; } message PeerModificationStateData { @@ -394,4 +396,17 @@ message RefreshPeerParameter { required string peer_id = 1; required PeerModificationType type = 2; required ServerName target_server = 3; +} + +message ModifyPeerStateData { + required string peer_id = 1; +} + +message AddPeerStateData { + required ReplicationPeer peer_config = 1; + required bool enabled = 2; +} + +message UpdatePeerConfigStateData { + required ReplicationPeer peer_config = 1; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto index eb396ac..4f75941 100644 --- a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto @@ -28,6 +28,7 @@ option optimize_for = SPEED; import "HBase.proto"; import "ClusterStatus.proto"; +import "ErrorHandling.proto"; message RegionServerStartupRequest { /** Port number this regionserver is up on */ @@ -152,7 +153,7 @@ message ReportProcedureDoneRequest { ERROR = 2; } required Status status = 2; - optional string error = 3; + optional ForeignExceptionMessage error = 3; } message ReportProcedureDoneResponse { http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-protocol-shaded/src/main/protobuf/Replication.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 8657c25..9f7b4c2 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -84,6 +84,7 @@ message AddReplicationPeerRequest { } message AddReplicationPeerResponse { + optional uint64 proc_id = 1; } message RemoveReplicationPeerRequest { @@ -91,6 +92,7 @@ message RemoveReplicationPeerRequest { } message RemoveReplicationPeerResponse { + optional uint64 proc_id = 1; } message EnableReplicationPeerRequest { @@ -98,6 +100,7 @@ message EnableReplicationPeerRequest { } message EnableReplicationPeerResponse { + optional uint64 proc_id = 1; } message DisableReplicationPeerRequest { @@ -105,6 +108,7 @@ message DisableReplicationPeerRequest { } message DisableReplicationPeerResponse { + optional uint64 proc_id = 1; } message GetReplicationPeerConfigRequest { @@ -122,6 +126,7 @@ message UpdateReplicationPeerConfigRequest { } message UpdateReplicationPeerConfigResponse { + optional uint64 proc_id = 1; } message ListReplicationPeersRequest { http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 289d2aa..0b2f08a 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -531,7 +531,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re for (String queueId : queueIds) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); if (queueInfo.getPeerId().equals(peerId)) { - throw new ReplicationException("undeleted queue for peerId: " + peerId + throw new IllegalArgumentException("undeleted queue for peerId: " + peerId + ", replicator: " + replicator + ", queueId: " + queueId); } } @@ -539,7 +539,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re // Check for hfile-refs queue if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode) && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) { - throw new ReplicationException("Undeleted queue for peerId: " + peerId + throw new IllegalArgumentException("Undeleted queue for peerId: " + peerId + ", found in hfile-refs node path " + hfileRefsZNode); } } catch (KeeperException e) { http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 44a9cdf..dfa720f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master; import com.google.protobuf.Descriptors; import com.google.protobuf.Service; + import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.Constructor; @@ -48,10 +49,12 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; + import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ClusterMetrics; @@ -126,7 +129,13 @@ import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure; import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; +import org.apache.hadoop.hbase.master.replication.AddPeerProcedure; +import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; +import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure; +import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure; +import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; import org.apache.hadoop.hbase.master.replication.ReplicationManager; +import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; @@ -139,6 +148,7 @@ import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.quotas.MasterSpaceQuotaObserver; @@ -167,7 +177,6 @@ import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EncryptionTest; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdLock; @@ -328,15 +337,15 @@ public class HMaster extends HRegionServer implements MasterServices { private volatile boolean activeMaster = false; // flag set after we complete initialization once active - private final ProcedureEvent initialized = new ProcedureEvent("master initialized"); + private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized"); // flag set after master services are started, // initialization may have not completed yet. volatile boolean serviceStarted = false; // flag set after we complete assignMeta. - private final ProcedureEvent serverCrashProcessingEnabled = - new ProcedureEvent("server crash processing"); + private final ProcedureEvent<?> serverCrashProcessingEnabled = + new ProcedureEvent<>("server crash processing"); // Maximum time we should run balancer for private final int maxBlancingTime; @@ -1211,7 +1220,6 @@ public class HMaster extends HRegionServer implements MasterServices { private void startProcedureExecutor() throws IOException { final MasterProcedureEnv procEnv = new MasterProcedureEnv(this); - final Path rootDir = FSUtils.getRootDir(conf); procedureStore = new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this)); @@ -2317,11 +2325,8 @@ public class HMaster extends HRegionServer implements MasterServices { return true; } Pair<RegionInfo, ServerName> pair = - new Pair(MetaTableAccessor.getRegionInfo(data), + new Pair<>(MetaTableAccessor.getRegionInfo(data), MetaTableAccessor.getServerName(data,0)); - if (pair == null) { - return false; - } if (!pair.getFirst().getTable().equals(tableName)) { return false; } @@ -2747,7 +2752,7 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public ProcedureEvent getInitializedEvent() { + public ProcedureEvent<?> getInitializedEvent() { return initialized; } @@ -2766,7 +2771,7 @@ public class HMaster extends HRegionServer implements MasterServices { procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b); } - public ProcedureEvent getServerCrashProcessingEnabledEvent() { + public ProcedureEvent<?> getServerCrashProcessingEnabledEvent() { return serverCrashProcessingEnabled; } @@ -3317,54 +3322,36 @@ public class HMaster extends HRegionServer implements MasterServices { return favoredNodesManager; } + private long executePeerProcedure(ModifyPeerProcedure procedure) throws IOException { + long procId = procedureExecutor.submitProcedure(procedure); + procedure.getLatch().await(); + return procId; + } + @Override - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + public long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException, IOException { - if (cpHost != null) { - cpHost.preAddReplicationPeer(peerId, peerConfig); - } - LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config=" - + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED")); - this.replicationManager.addReplicationPeer(peerId, peerConfig, enabled); - if (cpHost != null) { - cpHost.postAddReplicationPeer(peerId, peerConfig); - } + LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config=" + + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED")); + return executePeerProcedure(new AddPeerProcedure(peerId, peerConfig, enabled)); } @Override - public void removeReplicationPeer(String peerId) throws ReplicationException, IOException { - if (cpHost != null) { - cpHost.preRemoveReplicationPeer(peerId); - } + public long removeReplicationPeer(String peerId) throws ReplicationException, IOException { LOG.info(getClientIdAuditPrefix() + " removing replication peer, id=" + peerId); - this.replicationManager.removeReplicationPeer(peerId); - if (cpHost != null) { - cpHost.postRemoveReplicationPeer(peerId); - } + return executePeerProcedure(new RemovePeerProcedure(peerId)); } @Override - public void enableReplicationPeer(String peerId) throws ReplicationException, IOException { - if (cpHost != null) { - cpHost.preEnableReplicationPeer(peerId); - } + public long enableReplicationPeer(String peerId) throws ReplicationException, IOException { LOG.info(getClientIdAuditPrefix() + " enable replication peer, id=" + peerId); - this.replicationManager.enableReplicationPeer(peerId); - if (cpHost != null) { - cpHost.postEnableReplicationPeer(peerId); - } + return executePeerProcedure(new EnablePeerProcedure(peerId)); } @Override - public void disableReplicationPeer(String peerId) throws ReplicationException, IOException { - if (cpHost != null) { - cpHost.preDisableReplicationPeer(peerId); - } + public long disableReplicationPeer(String peerId) throws ReplicationException, IOException { LOG.info(getClientIdAuditPrefix() + " disable replication peer, id=" + peerId); - this.replicationManager.disableReplicationPeer(peerId); - if (cpHost != null) { - cpHost.postDisableReplicationPeer(peerId); - } + return executePeerProcedure(new DisablePeerProcedure(peerId)); } @Override @@ -3383,17 +3370,11 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public void updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) + public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) throws ReplicationException, IOException { - if (cpHost != null) { - cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig); - } - LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId - + ", config=" + peerConfig); - this.replicationManager.updatePeerConfig(peerId, peerConfig); - if (cpHost != null) { - cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig); - } + LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId + + ", config=" + peerConfig); + return executePeerProcedure(new UpdatePeerConfigProcedure(peerId, peerConfig)); } @Override @@ -3546,10 +3527,15 @@ public class HMaster extends HRegionServer implements MasterServices { } } - public void remoteProcedureFailed(long procId, String error) { + public void remoteProcedureFailed(long procId, RemoteProcedureException error) { RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId); if (procedure != null) { procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error); } } -} \ No newline at end of file + + @Override + public ReplicationManager getReplicationManager() { + return replicationManager; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index f875e20..8025a51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.procedure2.LockedResource; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsService; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; @@ -2256,7 +2257,8 @@ public class MasterRpcServices extends RSRpcServices if (request.getStatus() == ReportProcedureDoneRequest.Status.SUCCESS) { master.remoteProcedureCompleted(request.getProcId()); } else { - master.remoteProcedureFailed(request.getProcId(), request.getError()); + master.remoteProcedureFailed(request.getProcId(), + RemoteProcedureException.fromProto(request.getError())); } return ReportProcedureDoneResponse.getDefaultInstance(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 9786fde..e798455 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,10 +17,11 @@ */ package org.apache.hadoop.hbase.master; +import com.google.protobuf.Service; + import java.io.IOException; import java.util.List; -import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptors; @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.replication.ReplicationManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -52,8 +53,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Service; - /** * A curated subset of services provided by {@link HMaster}. * For use internally only. Passed to Managers, Services and Chores so can pass less-than-a @@ -136,7 +135,7 @@ public interface MasterServices extends Server { * @return Tripped when Master has finished initialization. */ @VisibleForTesting - public ProcedureEvent getInitializedEvent(); + public ProcedureEvent<?> getInitializedEvent(); /** * @return Master's instance of {@link MetricsMaster} @@ -430,26 +429,26 @@ public interface MasterServices extends Server { * @param peerConfig configuration for the replication slave cluster * @param enabled peer state, true if ENABLED and false if DISABLED */ - void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException, IOException; /** * Removes a peer and stops the replication * @param peerId a short name that identifies the peer */ - void removeReplicationPeer(String peerId) throws ReplicationException, IOException; + long removeReplicationPeer(String peerId) throws ReplicationException, IOException; /** * Restart the replication stream to the specified peer * @param peerId a short name that identifies the peer */ - void enableReplicationPeer(String peerId) throws ReplicationException, IOException; + long enableReplicationPeer(String peerId) throws ReplicationException, IOException; /** * Stop the replication stream to the specified peer * @param peerId a short name that identifies the peer */ - void disableReplicationPeer(String peerId) throws ReplicationException, IOException; + long disableReplicationPeer(String peerId) throws ReplicationException, IOException; /** * Returns the configured ReplicationPeerConfig for the specified peer @@ -460,11 +459,16 @@ public interface MasterServices extends Server { IOException; /** + * Returns the {@link ReplicationManager}. + */ + ReplicationManager getReplicationManager(); + + /** * Update the peerConfig for the specified peer * @param peerId a short name that identifies the peer * @param peerConfig new config for the peer */ - void updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) + long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) throws ReplicationException, IOException; /** http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java index 04dccc4..1724a38 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java @@ -16,12 +16,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.master.assignment; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; @@ -33,13 +31,16 @@ import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + /** * Base class for the Assign and Unassign Procedure. * @@ -415,7 +416,7 @@ public abstract class RegionTransitionProcedure } @Override - public void remoteOperationFailed(MasterProcedureEnv env, String error) { + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { // should not be called for region operation until we modified the open/close region procedure throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 0a4c97d..fa4d371 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.replication.ReplicationManager; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; @@ -137,6 +138,10 @@ public class MasterProcedureEnv implements ConfigurationObserver { return remoteDispatcher; } + public ReplicationManager getReplicationManager() { + return master.getReplicationManager(); + } + public boolean isRunning() { if (this.master == null || this.master.getMasterProcedureExecutor() == null) return false; return master.getMasterProcedureExecutor().isRunning(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java index 09d05e6..dbea6fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java @@ -64,7 +64,7 @@ public abstract class ProcedurePrepareLatch { protected abstract void countDown(final Procedure proc); public abstract void await() throws IOException; - protected static void releaseLatch(final ProcedurePrepareLatch latch, final Procedure proc) { + public static void releaseLatch(final ProcedurePrepareLatch latch, final Procedure proc) { if (latch != null) { latch.countDown(proc); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java new file mode 100644 index 0000000..c3862d8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AddPeerStateData; + +/** + * The procedure for adding a new replication peer. + */ +@InterfaceAudience.Private +public class AddPeerProcedure extends ModifyPeerProcedure { + + private static final Log LOG = LogFactory.getLog(AddPeerProcedure.class); + + private ReplicationPeerConfig peerConfig; + + private boolean enabled; + + public AddPeerProcedure() { + } + + public AddPeerProcedure(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) { + super(peerId); + this.peerConfig = peerConfig; + this.enabled = enabled; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.ADD; + } + + @Override + protected void prePeerModification(MasterProcedureEnv env) throws IOException { + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preAddReplicationPeer(peerId, peerConfig); + } + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationManager().addReplicationPeer(peerId, peerConfig, enabled); + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) throws IOException { + LOG.info("Successfully added " + (enabled ? "ENABLED" : "DISABLED") + " peer " + peerId + + ", config " + peerConfig); + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + env.getMasterCoprocessorHost().postAddReplicationPeer(peerId, peerConfig); + } + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(AddPeerStateData.newBuilder() + .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).setEnabled(enabled).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + AddPeerStateData data = serializer.deserialize(AddPeerStateData.class); + peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig()); + enabled = data.getEnabled(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java new file mode 100644 index 0000000..0b32db9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The procedure for disabling a replication peer. + */ +@InterfaceAudience.Private +public class DisablePeerProcedure extends ModifyPeerProcedure { + + private static final Log LOG = LogFactory.getLog(DisablePeerProcedure.class); + + public DisablePeerProcedure() { + } + + public DisablePeerProcedure(String peerId) { + super(peerId); + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.DISABLE; + } + + @Override + protected void prePeerModification(MasterProcedureEnv env) throws IOException { + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preDisableReplicationPeer(peerId); + } + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) + throws IllegalArgumentException, Exception { + env.getReplicationManager().disableReplicationPeer(peerId); + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) throws IOException { + LOG.info("Successfully disabled peer " + peerId); + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postDisableReplicationPeer(peerId); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java new file mode 100644 index 0000000..92ba000 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The procedure for enabling a replication peer. + */ +@InterfaceAudience.Private +public class EnablePeerProcedure extends ModifyPeerProcedure { + + private static final Log LOG = LogFactory.getLog(EnablePeerProcedure.class); + + public EnablePeerProcedure() { + } + + public EnablePeerProcedure(String peerId) { + super(peerId); + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.ENABLE; + } + + @Override + protected void prePeerModification(MasterProcedureEnv env) throws IOException { + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preEnableReplicationPeer(peerId); + } + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) throws Exception { + env.getReplicationManager().enableReplicationPeer(peerId); + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) throws IOException { + LOG.info("Successfully enabled peer " + peerId); + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postEnableReplicationPeer(peerId); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index fca05a7..7076bab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -21,15 +21,22 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; +import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ModifyPeerStateData; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; +/** + * The base class for all replication peer related procedure. + */ @InterfaceAudience.Private public abstract class ModifyPeerProcedure extends StateMachineProcedure<MasterProcedureEnv, PeerModificationState> @@ -39,11 +46,21 @@ public abstract class ModifyPeerProcedure protected String peerId; + // used to keep compatible with old client where we can only returns after updateStorage. + protected ProcedurePrepareLatch latch; + protected ModifyPeerProcedure() { } protected ModifyPeerProcedure(String peerId) { this.peerId = peerId; + // TODO: temporarily set a 4.0 here to always wait for the procedure exection completed. Change + // to 3.0 or 2.0 after the client modification is done. + this.latch = ProcedurePrepareLatch.createLatch(4, 0); + } + + public ProcedurePrepareLatch getLatch() { + return latch; } @Override @@ -52,28 +69,58 @@ public abstract class ModifyPeerProcedure } /** - * Return {@code false} means that the operation is invalid and we should give up, otherwise - * {@code true}. + * Called before we start the actual processing. If an exception is thrown then we will give up + * and mark the procedure as failed directly. + */ + protected abstract void prePeerModification(MasterProcedureEnv env) throws IOException; + + /** + * We will give up and mark the procedure as failure if {@link IllegalArgumentException} is + * thrown, for other type of Exception we will retry. + */ + protected abstract void updatePeerStorage(MasterProcedureEnv env) + throws IllegalArgumentException, Exception; + + /** + * Called before we finish the procedure. The implementation can do some logging work, and also + * call the coprocessor hook if any. * <p> - * You need to call {@link #setFailure(String, Throwable)} to give the detail failure information. + * Notice that, since we have already done the actual work, throwing exception here will not fail + * this procedure, we will just ignore it and finish the procedure as suceeded. */ - protected abstract boolean updatePeerStorage() throws IOException; + protected abstract void postPeerModification(MasterProcedureEnv env) throws IOException; - protected void postPeerModification() { + private void releaseLatch() { + ProcedurePrepareLatch.releaseLatch(latch, this); } @Override protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { switch (state) { - case UPDATE_PEER_STORAGE: + case PRE_PEER_MODIFICATION: try { - if (!updatePeerStorage()) { - assert isFailed() : "setFailure is not called"; - return Flow.NO_MORE_STATE; - } + prePeerModification(env); } catch (IOException e) { - LOG.warn("update peer storage failed, retry", e); + LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + + ", mark the procedure as failure and give up", e); + setFailure("prePeerModification", e); + releaseLatch(); + return Flow.NO_MORE_STATE; + } + setNextState(PeerModificationState.UPDATE_PEER_STORAGE); + return Flow.HAS_MORE_STATE; + case UPDATE_PEER_STORAGE: + try { + updatePeerStorage(env); + } catch (IllegalArgumentException e) { + setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", + new DoNotRetryIOException(e)); + releaseLatch(); + return Flow.NO_MORE_STATE; + } catch (Exception e) { + LOG.warn( + getClass().getName() + " update peer storage for peer " + peerId + " failed, retry", e); throw new ProcedureYieldException(); } setNextState(PeerModificationState.REFRESH_PEER_ON_RS); @@ -85,7 +132,13 @@ public abstract class ModifyPeerProcedure setNextState(PeerModificationState.POST_PEER_MODIFICATION); return Flow.HAS_MORE_STATE; case POST_PEER_MODIFICATION: - postPeerModification(); + try { + postPeerModification(env); + } catch (IOException e) { + LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + + ", ignore since the procedure has already done", e); + } + releaseLatch(); return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException("unhandled state=" + state); @@ -107,6 +160,12 @@ public abstract class ModifyPeerProcedure @Override protected void rollbackState(MasterProcedureEnv env, PeerModificationState state) throws IOException, InterruptedException { + if (state == PeerModificationState.PRE_PEER_MODIFICATION || + state == PeerModificationState.UPDATE_PEER_STORAGE) { + // actually the peer related operations has no rollback, but if we haven't done any + // modifications on the peer storage, we can just return. + return; + } throw new UnsupportedOperationException(); } @@ -122,6 +181,18 @@ public abstract class ModifyPeerProcedure @Override protected PeerModificationState getInitialState() { - return PeerModificationState.UPDATE_PEER_STORAGE; + return PeerModificationState.PRE_PEER_MODIFICATION; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(ModifyPeerStateData.newBuilder().setPeerId(peerId).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + peerId = serializer.deserialize(ModifyPeerStateData.class).getPeerId(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java deleted file mode 100644 index 4e09107..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.replication; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.yetus.audience.InterfaceAudience; - -import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter; - -/** - * The callable executed at RS side to refresh the peer config/state. - * <p> - * TODO: only a dummy implementation for verifying the framework, will add implementation later. - */ -@InterfaceAudience.Private -public class RefreshPeerCallable implements RSProcedureCallable { - - private HRegionServer rs; - - private String peerId; - - private Exception initError; - - @Override - public Void call() throws Exception { - if (initError != null) { - throw initError; - } - rs.getFileSystem().create(new Path("/" + peerId + "/" + rs.getServerName().toString())).close(); - return null; - } - - @Override - public void init(byte[] parameter, HRegionServer rs) { - this.rs = rs; - try { - this.peerId = RefreshPeerParameter.parseFrom(parameter).getPeerId(); - } catch (InvalidProtocolBufferException e) { - initError = e; - return; - } - } - - @Override - public EventType getEventType() { - return EventType.RS_REFRESH_PEER; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java index 18da487..ddc2401 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -118,15 +120,22 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv> .setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray()); } - private void complete(MasterProcedureEnv env, boolean succ) { + private void complete(MasterProcedureEnv env, Throwable error) { if (event == null) { LOG.warn("procedure event for " + getProcId() + - " is null, maybe the procedure is created when recovery", new Exception()); + " is null, maybe the procedure is created when recovery", + new Exception()); return; } - LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer + - (succ ? " suceeded" : " failed")); - this.succ = succ; + if (error != null) { + LOG.warn("Refresh peer " + peerId + " for " + type + " on " + targetServer + " failed", + error); + this.succ = false; + } else { + LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer + " suceeded"); + this.succ = true; + } + event.wake(env.getProcedureScheduler()); event = null; } @@ -134,17 +143,18 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv> @Override public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) { - complete(env, false); + complete(env, exception); } @Override public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { - complete(env, true); + complete(env, null); } @Override - public synchronized void remoteOperationFailed(MasterProcedureEnv env, String error) { - complete(env, false); + public synchronized void remoteOperationFailed(MasterProcedureEnv env, + RemoteProcedureException error) { + complete(env, error); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java new file mode 100644 index 0000000..3daad6d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The procedure for removing a replication peer. + */ +@InterfaceAudience.Private +public class RemovePeerProcedure extends ModifyPeerProcedure { + + private static final Log LOG = LogFactory.getLog(RemovePeerProcedure.class); + + public RemovePeerProcedure() { + } + + public RemovePeerProcedure(String peerId) { + super(peerId); + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.REMOVE; + } + + @Override + protected void prePeerModification(MasterProcedureEnv env) throws IOException { + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preRemoveReplicationPeer(peerId); + } + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) throws Exception { + env.getReplicationManager().removeReplicationPeer(peerId); + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) throws IOException { + LOG.info("Successfully removed peer " + peerId); + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postRemoveReplicationPeer(peerId); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java index f36b2e2..b6f8784 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java @@ -27,10 +27,8 @@ import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; @@ -39,24 +37,21 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; /** * Manages and performs all replication admin operations. + * <p> * Used to add/remove a replication peer. */ @InterfaceAudience.Private public class ReplicationManager { - - private final Configuration conf; - private final ZKWatcher zkw; private final ReplicationQueuesClient replicationQueuesClient; private final ReplicationPeers replicationPeers; public ReplicationManager(Configuration conf, ZKWatcher zkw, Abortable abortable) throws IOException { - this.conf = conf; - this.zkw = zkw; try { this.replicationQueuesClient = ReplicationFactory .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw)); @@ -70,7 +65,7 @@ public class ReplicationManager { } public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) - throws ReplicationException, IOException { + throws ReplicationException { checkPeerConfig(peerConfig); replicationPeers.registerPeer(peerId, peerConfig, enabled); replicationPeers.peerConnected(peerId); @@ -89,8 +84,8 @@ public class ReplicationManager { this.replicationPeers.disablePeer(peerId); } - public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException, - ReplicationPeerNotFoundException { + public ReplicationPeerConfig getPeerConfig(String peerId) + throws ReplicationException, ReplicationPeerNotFoundException { ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(peerId); if (peerConfig == null) { throw new ReplicationPeerNotFoundException(peerId); @@ -110,9 +105,9 @@ public class ReplicationManager { List<String> peerIds = replicationPeers.getAllPeerIds(); for (String peerId : peerIds) { if (pattern == null || (pattern != null && pattern.matcher(peerId).matches())) { - peers.add(new ReplicationPeerDescription(peerId, replicationPeers - .getStatusOfPeerFromBackingStore(peerId), replicationPeers - .getReplicationPeerConfig(peerId))); + peers.add(new ReplicationPeerDescription(peerId, + replicationPeers.getStatusOfPeerFromBackingStore(peerId), + replicationPeers.getReplicationPeerConfig(peerId))); } } return peers; @@ -126,13 +121,12 @@ public class ReplicationManager { * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. * Then allow to config namespaces or table-cfs which will be replicated to peer cluster. */ - private void checkPeerConfig(ReplicationPeerConfig peerConfig) - throws ReplicationException, IOException { + private void checkPeerConfig(ReplicationPeerConfig peerConfig) { if (peerConfig.replicateAllUserTables()) { - if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) - || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { - throw new ReplicationException("Need clean namespaces or table-cfs config firstly" - + " when replicate_all flag is true"); + if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || + (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { + throw new IllegalArgumentException("Need clean namespaces or table-cfs config firstly " + + "when you want replicate all cluster"); } checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), peerConfig.getExcludeTableCFsMap()); @@ -141,7 +135,7 @@ public class ReplicationManager { && !peerConfig.getExcludeNamespaces().isEmpty()) || (peerConfig.getExcludeTableCFsMap() != null && !peerConfig.getExcludeTableCFsMap().isEmpty())) { - throw new ReplicationException( + throw new IllegalArgumentException( "Need clean exclude-namespaces or exclude-table-cfs config firstly" + " when replicate_all flag is false"); } @@ -154,20 +148,24 @@ public class ReplicationManager { /** * Set a namespace in the peer config means that all tables in this namespace will be replicated * to the peer cluster. - * 1. If peer config already has a namespace, then not allow set any table of this namespace - * to the peer config. - * 2. If peer config already has a table, then not allow set this table's namespace to the peer - * config. - * + * <ol> + * <li>If peer config already has a namespace, then not allow set any table of this namespace to + * the peer config.</li> + * <li>If peer config already has a table, then not allow set this table's namespace to the peer + * config.</li> + * </ol> + * <p> * Set a exclude namespace in the peer config means that all tables in this namespace can't be * replicated to the peer cluster. - * 1. If peer config already has a exclude namespace, then not allow set any exclude table of - * this namespace to the peer config. - * 2. If peer config already has a exclude table, then not allow set this table's namespace - * as a exclude namespace. + * <ol> + * <li>If peer config already has a exclude namespace, then not allow set any exclude table of + * this namespace to the peer config.</li> + * <li>If peer config already has a exclude table, then not allow set this table's namespace as a + * exclude namespace.</li> + * </ol> */ private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, - Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException { + Map<TableName, ? extends Collection<String>> tableCfs) { if (namespaces == null || namespaces.isEmpty()) { return; } @@ -177,24 +175,22 @@ public class ReplicationManager { for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { TableName table = entry.getKey(); if (namespaces.contains(table.getNamespaceAsString())) { - throw new ReplicationException("Table-cfs " + table + " is conflict with namespaces " + throw new IllegalArgumentException("Table-cfs " + table + " is conflict with namespaces " + table.getNamespaceAsString() + " in peer config"); } } } - private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) - throws IOException { - String filterCSV = peerConfig.getConfiguration(). - get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); - if (filterCSV != null && !filterCSV.isEmpty()){ - String [] filters = filterCSV.split(","); + private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) { + String filterCSV = peerConfig.getConfiguration() + .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); + if (filterCSV != null && !filterCSV.isEmpty()) { + String[] filters = filterCSV.split(","); for (String filter : filters) { try { - Class clazz = Class.forName(filter); - Object o = clazz.newInstance(); + Class.forName(filter).newInstance(); } catch (Exception e) { - throw new DoNotRetryIOException("Configured WALEntryFilter " + filter + + throw new IllegalArgumentException("Configured WALEntryFilter " + filter + " could not be created. Failing add/update " + "peer operation.", e); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java new file mode 100644 index 0000000..435eefc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UpdatePeerConfigStateData; + +/** + * The procedure for updating the config for a replication peer. + */ +@InterfaceAudience.Private +public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { + + private static final Log LOG = LogFactory.getLog(UpdatePeerConfigProcedure.class); + + private ReplicationPeerConfig peerConfig; + + public UpdatePeerConfigProcedure() { + } + + public UpdatePeerConfigProcedure(String peerId, ReplicationPeerConfig peerConfig) { + super(peerId); + this.peerConfig = peerConfig; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.UPDATE_CONFIG; + } + + @Override + protected void prePeerModification(MasterProcedureEnv env) throws IOException { + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig); + } + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) + throws IllegalArgumentException, Exception { + env.getReplicationManager().updatePeerConfig(peerId, peerConfig); + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) throws IOException { + LOG.info("Successfully updated peer config of " + peerId + " to " + peerConfig); + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig); + } + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(UpdatePeerConfigStateData.newBuilder() + .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + peerConfig = ReplicationPeerConfigUtil + .convert(serializer.deserialize(UpdatePeerConfigStateData.class).getPeerConfig()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 03b7ef7..7c6f286 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -146,6 +146,7 @@ import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.JvmPauseMonitor; import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; @@ -209,7 +210,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; @@ -3722,7 +3722,7 @@ public class HRegionServer extends HasThread implements ReportProcedureDoneRequest.newBuilder().setProcId(procId); if (error != null) { builder.setStatus(ReportProcedureDoneRequest.Status.ERROR) - .setError(Throwables.getStackTraceAsString(error)); + .setError(ForeignExceptionUtil.toProtoForeignException(serverName.toString(), error)); } else { builder.setStatus(ReportProcedureDoneRequest.Status.SUCCESS); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java new file mode 100644 index 0000000..a47a483 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter; + +/** + * The callable executed at RS side to refresh the peer config/state. + * <p> + * TODO: only a dummy implementation for verifying the framework, will add implementation later. + */ +@InterfaceAudience.Private +public class RefreshPeerCallable implements RSProcedureCallable { + + private HRegionServer rs; + + private String peerId; + + private Exception initError; + + @Override + public Void call() throws Exception { + if (initError != null) { + throw initError; + } + Path dir = new Path("/" + peerId); + if (rs.getFileSystem().exists(dir)) { + rs.getFileSystem().create(new Path(dir, rs.getServerName().toString())).close(); + } + return null; + } + + @Override + public void init(byte[] parameter, HRegionServer rs) { + this.rs = rs; + try { + this.peerId = RefreshPeerParameter.parseFrom(parameter).getPeerId(); + } catch (InvalidProtocolBufferException e) { + initError = e; + return; + } + } + + @Override + public EventType getEventType() { + return EventType.RS_REFRESH_PEER; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 7b6af0e..8442530 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.replication.ReplicationManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -369,7 +370,6 @@ public class MockNoopMasterServices implements MasterServices, Server { @Override public ClusterConnection getClusterConnection() { - // TODO Auto-generated method stub return null; } @@ -399,20 +399,24 @@ public class MockNoopMasterServices implements MasterServices, Server { } @Override - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + public long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException { + return 0; } @Override - public void removeReplicationPeer(String peerId) throws ReplicationException { + public long removeReplicationPeer(String peerId) throws ReplicationException { + return 0; } @Override - public void enableReplicationPeer(String peerId) throws ReplicationException, IOException { + public long enableReplicationPeer(String peerId) throws ReplicationException, IOException { + return 0; } @Override - public void disableReplicationPeer(String peerId) throws ReplicationException, IOException { + public long disableReplicationPeer(String peerId) throws ReplicationException, IOException { + return 0; } @Override @@ -422,8 +426,9 @@ public class MockNoopMasterServices implements MasterServices, Server { } @Override - public void updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) + public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) throws ReplicationException, IOException { + return 0; } @Override @@ -458,7 +463,6 @@ public class MockNoopMasterServices implements MasterServices, Server { @Override public ProcedureEvent getInitializedEvent() { - // TODO Auto-generated method stub return null; } @@ -471,4 +475,9 @@ public class MockNoopMasterServices implements MasterServices, Server { public Connection createConnection(Configuration conf) throws IOException { return null; } + + @Override + public ReplicationManager getReplicationManager() { + return null; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ab7a94e4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java index 44343d7..ed7c6fa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.master.replication; -import java.io.IOException; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; public class DummyModifyPeerProcedure extends ModifyPeerProcedure { @@ -34,8 +34,15 @@ public class DummyModifyPeerProcedure extends ModifyPeerProcedure { } @Override - protected boolean updatePeerStorage() throws IOException { - return true; + protected void prePeerModification(MasterProcedureEnv env) { + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) { + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) { } }