HBASE-19781 Add a new cluster state flag for synchronous replication
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7adf2719 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7adf2719 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7adf2719 Branch: refs/heads/HBASE-19064 Commit: 7adf2719d3137e6fe18a4813c159cac6924b3e86 Parents: 669dbcc Author: Guanghao Zhang <zg...@apache.org> Authored: Mon Jan 22 11:44:49 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Tue Mar 27 17:30:56 2018 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/Admin.java | 39 +++++ .../apache/hadoop/hbase/client/AsyncAdmin.java | 31 ++++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 7 + .../hbase/client/ConnectionImplementation.java | 9 ++ .../apache/hadoop/hbase/client/HBaseAdmin.java | 26 +++ .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 15 ++ .../client/ShortCircuitMasterConnection.java | 9 ++ .../replication/ReplicationPeerConfigUtil.java | 26 +-- .../replication/ReplicationPeerDescription.java | 10 +- .../hbase/replication/SyncReplicationState.java | 48 ++++++ .../hbase/shaded/protobuf/RequestConverter.java | 10 ++ .../src/main/protobuf/Master.proto | 4 + .../src/main/protobuf/MasterProcedure.proto | 4 + .../src/main/protobuf/Replication.proto | 20 +++ .../replication/ReplicationPeerStorage.java | 18 ++- .../hbase/replication/ReplicationUtils.java | 1 + .../replication/ZKReplicationPeerStorage.java | 60 +++++-- .../replication/TestReplicationStateBasic.java | 23 ++- .../TestZKReplicationPeerStorage.java | 12 +- .../hbase/coprocessor/MasterObserver.java | 23 +++ .../org/apache/hadoop/hbase/master/HMaster.java | 12 ++ .../hbase/master/MasterCoprocessorHost.java | 21 +++ .../hadoop/hbase/master/MasterRpcServices.java | 17 ++ .../hadoop/hbase/master/MasterServices.java | 9 ++ .../procedure/PeerProcedureInterface.java | 2 +- .../replication/ReplicationPeerManager.java | 51 +++++- ...ransitPeerSyncReplicationStateProcedure.java | 159 +++++++++++++++++++ .../hbase/security/access/AccessController.java | 8 + .../replication/TestReplicationAdmin.java | 62 ++++++++ .../hbase/master/MockNoopMasterServices.java | 8 +- .../cleaner/TestReplicationHFileCleaner.java | 4 +- .../TestReplicationTrackerZKImpl.java | 6 +- .../TestReplicationSourceManager.java | 3 +- .../security/access/TestAccessController.java | 16 ++ .../hbase/util/TestHBaseFsckReplication.java | 5 +- .../src/main/ruby/hbase/replication_admin.rb | 15 ++ hbase-shell/src/main/ruby/shell.rb | 1 + .../src/main/ruby/shell/commands/list_peers.rb | 6 +- .../transit_peer_sync_replication_state.rb | 44 +++++ .../test/ruby/hbase/replication_admin_test.rb | 24 +++ 40 files changed, 815 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index b8546fa..167d6f3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; @@ -2648,6 +2649,44 @@ public interface Admin extends Abortable, Closeable { List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) throws IOException; /** + * Transit current cluster to a new state in a synchronous replication peer. + * @param peerId a short name that identifies the peer + * @param state a new state of current cluster + * @throws IOException if a remote or network exception occurs + */ + void transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state) + throws IOException; + + /** + * Transit current cluster to a new state in a synchronous replication peer. But does not block + * and wait for it. + * <p> + * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw + * ExecutionException if there was an error while executing the operation or TimeoutException in + * case the wait timeout was not long enough to allow the operation to complete. + * @param peerId a short name that identifies the peer + * @param state a new state of current cluster + * @throws IOException if a remote or network exception occurs + */ + Future<Void> transitReplicationPeerSyncReplicationStateAsync(String peerId, + SyncReplicationState state) throws IOException; + + /** + * Get the current cluster state in a synchronous replication peer. + * @param peerId a short name that identifies the peer + * @return the current cluster state + * @throws IOException if a remote or network exception occurs + */ + default SyncReplicationState getReplicationPeerSyncReplicationState(String peerId) + throws IOException { + List<ReplicationPeerDescription> peers = listReplicationPeers(Pattern.compile(peerId)); + if (peers.isEmpty() || !peers.get(0).getPeerId().equals(peerId)) { + throw new IOException("Replication peer " + peerId + " does not exist"); + } + return peers.get(0).getSyncReplicationState(); + } + + /** * Mark region server(s) as decommissioned to prevent additional regions from getting * assigned to them. Optionally unload the regions on the servers. If there are multiple servers * to be decommissioned, decommissioning them at the same time can prevent wasteful region http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 35cdd3f..895e7ff 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import com.google.protobuf.RpcChannel; +import java.io.IOException; import java.util.Collection; import java.util.EnumSet; import java.util.List; @@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.quotas.QuotaFilter; import org.apache.hadoop.hbase.quotas.QuotaSettings; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.yetus.audience.InterfaceAudience; /** @@ -600,6 +602,35 @@ public interface AsyncAdmin { ReplicationPeerConfig peerConfig); /** + * Transit current cluster to a new state in a synchronous replication peer. + * @param peerId a short name that identifies the peer + * @param state a new state of current cluster + */ + CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId, + SyncReplicationState state); + + /** + * Get the current cluster state in a synchronous replication peer. + * @param peerId a short name that identifies the peer + * @return the current cluster state wrapped by a {@link CompletableFuture}. + */ + default CompletableFuture<SyncReplicationState> + getReplicationPeerSyncReplicationState(String peerId) { + CompletableFuture<SyncReplicationState> future = new CompletableFuture<>(); + listReplicationPeers(Pattern.compile(peerId)).whenComplete((peers, error) -> { + if (error != null) { + future.completeExceptionally(error); + } else if (peers.isEmpty() || !peers.get(0).getPeerId().equals(peerId)) { + future.completeExceptionally( + new IOException("Replication peer " + peerId + " does not exist")); + } else { + future.complete(peers.get(0).getSyncReplicationState()); + } + }); + return future; + } + + /** * Append the replicable table-cf config of the specified peer * @param peerId a short that identifies the cluster * @param tableCfs A map from tableName to column family names http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 9b2390c..44771fd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.quotas.QuotaFilter; import org.apache.hadoop.hbase.quotas.QuotaSettings; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.yetus.audience.InterfaceAudience; /** @@ -414,6 +415,12 @@ class AsyncHBaseAdmin implements AsyncAdmin { } @Override + public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId, + SyncReplicationState clusterState) { + return wrap(rawAdmin.transitReplicationPeerSyncReplicationState(peerId, clusterState)); + } + + @Override public CompletableFuture<Void> appendReplicationPeerTableCFs(String peerId, Map<TableName, List<String>> tableCfs) { return wrap(rawAdmin.appendReplicationPeerTableCFs(peerId, tableCfs)); http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 7723161..efd9098 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -123,6 +123,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; @@ -1721,6 +1723,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable { MasterProtos.ClearDeadServersRequest request) throws ServiceException { return stub.clearDeadServers(controller, request); } + + @Override + public TransitReplicationPeerSyncReplicationStateResponse + transitReplicationPeerSyncReplicationState(RpcController controller, + TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException { + return stub.transitReplicationPeerSyncReplicationState(controller, request); + } }; } http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 8685984..c01b891 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; @@ -206,6 +207,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Disab import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; @@ -3991,6 +3993,30 @@ public class HBaseAdmin implements Admin { } @Override + public void transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state) + throws IOException { + get(transitReplicationPeerSyncReplicationStateAsync(peerId, state), this.syncWaitTimeout, + TimeUnit.MILLISECONDS); + } + + @Override + public Future<Void> transitReplicationPeerSyncReplicationStateAsync(String peerId, + SyncReplicationState state) throws IOException { + TransitReplicationPeerSyncReplicationStateResponse response = + executeCallable(new MasterCallable<TransitReplicationPeerSyncReplicationStateResponse>( + getConnection(), getRpcControllerFactory()) { + @Override + protected TransitReplicationPeerSyncReplicationStateResponse rpcCall() throws Exception { + return master.transitReplicationPeerSyncReplicationState(getRpcController(), + RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId, + state)); + } + }); + return new ReplicationFuture(this, peerId, response.getProcId(), + () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"); + } + + @Override public void appendReplicationPeerTableCFs(String id, Map<TableName, List<String>> tableCfs) throws ReplicationException, IOException { http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 050bfe2..30a372d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.quotas.QuotaTableUtil; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; @@ -255,6 +256,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; @@ -1613,6 +1616,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override + public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId, + SyncReplicationState clusterState) { + return this + .<TransitReplicationPeerSyncReplicationStateRequest, TransitReplicationPeerSyncReplicationStateResponse> procedureCall( + RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId, + clusterState), + (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done), + (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId, + () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE")); + } + + @Override public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, Map<TableName, List<String>> tableCfs) { if (tableCfs == null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java index 50690b4..7bb65d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java @@ -166,6 +166,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; @@ -638,4 +640,11 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection { throws ServiceException { return stub.splitRegion(controller, request); } + + @Override + public TransitReplicationPeerSyncReplicationStateResponse + transitReplicationPeerSyncReplicationState(RpcController controller, + TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException { + return stub.transitReplicationPeerSyncReplicationState(controller, request); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 474ded3..6cbe05b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -396,25 +397,28 @@ public final class ReplicationPeerConfigUtil { return ProtobufUtil.prependPBMagic(bytes); } - public static ReplicationPeerDescription toReplicationPeerDescription( - ReplicationProtos.ReplicationPeerDescription desc) { - boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == desc.getState() - .getState(); + public static ReplicationPeerDescription + toReplicationPeerDescription(ReplicationProtos.ReplicationPeerDescription desc) { + boolean enabled = + ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState(); ReplicationPeerConfig config = convert(desc.getConfig()); - return new ReplicationPeerDescription(desc.getId(), enabled, config); + return new ReplicationPeerDescription(desc.getId(), enabled, config, + SyncReplicationState.valueOf(desc.getSyncReplicationState().getNumber())); } - public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription( - ReplicationPeerDescription desc) { + public static ReplicationProtos.ReplicationPeerDescription + toProtoReplicationPeerDescription(ReplicationPeerDescription desc) { ReplicationProtos.ReplicationPeerDescription.Builder builder = ReplicationProtos.ReplicationPeerDescription.newBuilder(); builder.setId(desc.getPeerId()); - ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState - .newBuilder(); - stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED - : ReplicationProtos.ReplicationState.State.DISABLED); + ReplicationProtos.ReplicationState.Builder stateBuilder = + ReplicationProtos.ReplicationState.newBuilder(); + stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED : + ReplicationProtos.ReplicationState.State.DISABLED); builder.setState(stateBuilder.build()); builder.setConfig(convert(desc.getPeerConfig())); + builder.setSyncReplicationState( + ReplicationProtos.SyncReplicationState.forNumber(desc.getSyncReplicationState().ordinal())); return builder.build(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java index ba97d07..2d077c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java @@ -28,11 +28,14 @@ public class ReplicationPeerDescription { private final String id; private final boolean enabled; private final ReplicationPeerConfig config; + private final SyncReplicationState syncReplicationState; - public ReplicationPeerDescription(String id, boolean enabled, ReplicationPeerConfig config) { + public ReplicationPeerDescription(String id, boolean enabled, ReplicationPeerConfig config, + SyncReplicationState syncReplicationState) { this.id = id; this.enabled = enabled; this.config = config; + this.syncReplicationState = syncReplicationState; } public String getPeerId() { @@ -47,11 +50,16 @@ public class ReplicationPeerDescription { return this.config; } + public SyncReplicationState getSyncReplicationState() { + return this.syncReplicationState; + } + @Override public String toString() { StringBuilder builder = new StringBuilder("id : ").append(id); builder.append(", enabled : " + enabled); builder.append(", config : " + config); + builder.append(", syncReplicationState : " + syncReplicationState); return builder.toString(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java new file mode 100644 index 0000000..bd144e9 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Used by synchronous replication. Indicate the state of the current cluster in a synchronous + * replication peer. The state may be one of {@link SyncReplicationState#ACTIVE}, + * {@link SyncReplicationState#DOWNGRADE_ACTIVE} or + * {@link SyncReplicationState#STANDBY}. + * <p> + * For asynchronous replication, the state is {@link SyncReplicationState#NONE}. + */ +@InterfaceAudience.Public +public enum SyncReplicationState { + NONE, ACTIVE, DOWNGRADE_ACTIVE, STANDBY; + + public static SyncReplicationState valueOf(int value) { + switch (value) { + case 0: + return NONE; + case 1: + return ACTIVE; + case 2: + return DOWNGRADE_ACTIVE; + case 3: + return STANDBY; + default: + throw new IllegalArgumentException("Unknown synchronous replication state " + value); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 8ce2f1b..4e67b78 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -147,6 +148,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; /** @@ -1867,4 +1869,12 @@ public final class RequestConverter { } return pbServers; } + + public static TransitReplicationPeerSyncReplicationStateRequest + buildTransitReplicationPeerSyncReplicationStateRequest(String peerId, + SyncReplicationState state) { + return TransitReplicationPeerSyncReplicationStateRequest.newBuilder().setPeerId(peerId) + .setSyncReplicationState(ReplicationProtos.SyncReplicationState.forNumber(state.ordinal())) + .build(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-protocol-shaded/src/main/protobuf/Master.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index 3a236c0..c2ab180 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -962,6 +962,10 @@ service MasterService { rpc ListReplicationPeers(ListReplicationPeersRequest) returns(ListReplicationPeersResponse); + /** Transit the state of current cluster in a synchronous replication peer */ + rpc TransitReplicationPeerSyncReplicationState(TransitReplicationPeerSyncReplicationStateRequest) + returns(TransitReplicationPeerSyncReplicationStateResponse); + /** Returns a list of ServerNames marked as decommissioned. */ rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest) returns(ListDecommissionedRegionServersResponse); http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index b37557c..acc2cbd 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -431,3 +431,7 @@ message EnablePeerStateData { message DisablePeerStateData { } + +message TransitPeerSyncReplicationStateStateData { + required SyncReplicationState syncReplicationState = 1; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-protocol-shaded/src/main/protobuf/Replication.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 20dd049..3564ae4 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -64,12 +64,23 @@ message ReplicationState { } /** + * Indicate the state of the current cluster in a synchronous replication peer. + */ +enum SyncReplicationState { + NONE = 0; + ACTIVE = 1; + DOWNGRADE_ACTIVE = 2; + STANDBY = 3; +} + +/** * Used by replication. Description of the replication peer. */ message ReplicationPeerDescription { required string id = 1; required ReplicationState state = 2; required ReplicationPeer config = 3; + optional SyncReplicationState syncReplicationState = 4; } /** @@ -138,3 +149,12 @@ message ListReplicationPeersRequest { message ListReplicationPeersResponse { repeated ReplicationPeerDescription peer_desc = 1; } + +message TransitReplicationPeerSyncReplicationStateRequest { + required string peer_id = 1; + required SyncReplicationState syncReplicationState = 2; +} + +message TransitReplicationPeerSyncReplicationStateResponse { + required uint64 proc_id = 1; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java index 1adda02..d2538ab 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java @@ -31,8 +31,8 @@ public interface ReplicationPeerStorage { * Add a replication peer. * @throws ReplicationException if there are errors accessing the storage service. */ - void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) - throws ReplicationException; + void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled, + SyncReplicationState syncReplicationState) throws ReplicationException; /** * Remove a replication peer. @@ -70,4 +70,18 @@ public interface ReplicationPeerStorage { * @throws ReplicationException if there are errors accessing the storage service. */ ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException; + + /** + * Set the state of current cluster in a synchronous replication peer. + * @throws ReplicationException if there are errors accessing the storage service. + */ + void setPeerSyncReplicationState(String peerId, SyncReplicationState state) + throws ReplicationException; + + /** + * Get the state of current cluster in a synchronous replication peer. + * @throws ReplicationException if there are errors accessing the storage service. + */ + SyncReplicationState getPeerSyncReplicationState(String peerId) + throws ReplicationException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index c7568bb..e4dea83 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration; http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java index a53500a..338ce3f 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; @@ -51,6 +52,8 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase public static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); + public static final String SYNCHRONOUS_REPLICATION_STATE_ZNODE = "sync-rep-state"; + /** * The name of the znode that contains the replication status of a remote slave (i.e. peer) * cluster. @@ -79,21 +82,29 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase return ZNodePaths.joinZNode(peersZNode, peerId); } + @VisibleForTesting + public String getSyncReplicationStateNode(String peerId) { + return ZNodePaths.joinZNode(getPeerNode(peerId), SYNCHRONOUS_REPLICATION_STATE_ZNODE); + } + @Override - public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) - throws ReplicationException { + public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled, + SyncReplicationState syncReplicationState) throws ReplicationException { + List<ZKUtilOp> multiOps = Arrays.asList( + ZKUtilOp.createAndFailSilent(getPeerNode(peerId), + ReplicationPeerConfigUtil.toByteArray(peerConfig)), + ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId), + enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES), + ZKUtilOp.createAndFailSilent(getSyncReplicationStateNode(peerId), + Bytes.toBytes(syncReplicationState.ordinal()))); try { ZKUtil.createWithParents(zookeeper, peersZNode); - ZKUtil.multiOrSequential(zookeeper, - Arrays.asList( - ZKUtilOp.createAndFailSilent(getPeerNode(peerId), - ReplicationPeerConfigUtil.toByteArray(peerConfig)), - ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId), - enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)), - false); + ZKUtil.multiOrSequential(zookeeper, multiOps, false); } catch (KeeperException e) { - throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" - + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e); + throw new ReplicationException( + "Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state=" + + (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState, + e); } } @@ -166,4 +177,31 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase "Failed to parse replication peer config for peer with id=" + peerId, e); } } + + @Override + public void setPeerSyncReplicationState(String peerId, SyncReplicationState clusterState) + throws ReplicationException { + byte[] clusterStateBytes = Bytes.toBytes(clusterState.ordinal()); + try { + ZKUtil.setData(zookeeper, getSyncReplicationStateNode(peerId), clusterStateBytes); + } catch (KeeperException e) { + throw new ReplicationException( + "Unable to change the cluster state for the synchronous replication peer with id=" + + peerId, + e); + } + } + + @Override + public SyncReplicationState getPeerSyncReplicationState(String peerId) + throws ReplicationException { + byte[] data; + try { + data = ZKUtil.getData(zookeeper, getSyncReplicationStateNode(peerId)); + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException( + "Error getting cluster state for the synchronous replication peer with id=" + peerId, e); + } + return SyncReplicationState.valueOf(Bytes.toInt(data)); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 437804c..4a2c3cd 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -166,7 +166,8 @@ public abstract class TestReplicationStateBasic { assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); rp.getPeerStorage().addPeer(ID_ONE, - ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, + SyncReplicationState.NONE); rqs.addPeerToHFileRefs(ID_ONE); rqs.addHFileRefs(ID_ONE, files1); assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); @@ -189,10 +190,12 @@ public abstract class TestReplicationStateBasic { public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { rp.init(); rp.getPeerStorage().addPeer(ID_ONE, - ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, + SyncReplicationState.NONE); rqs.addPeerToHFileRefs(ID_ONE); rp.getPeerStorage().addPeer(ID_TWO, - ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true); + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true, + SyncReplicationState.NONE); rqs.addPeerToHFileRefs(ID_TWO); List<Pair<Path, Path>> files1 = new ArrayList<>(3); @@ -241,9 +244,13 @@ public abstract class TestReplicationStateBasic { assertNumberOfPeers(0); // Add some peers - rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); + rp.getPeerStorage().addPeer(ID_ONE, + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, + SyncReplicationState.NONE); assertNumberOfPeers(1); - rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true); + rp.getPeerStorage().addPeer(ID_TWO, + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true, + SyncReplicationState.NONE); assertNumberOfPeers(2); assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils @@ -253,7 +260,9 @@ public abstract class TestReplicationStateBasic { assertNumberOfPeers(1); // Add one peer - rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); + rp.getPeerStorage().addPeer(ID_ONE, + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, + SyncReplicationState.NONE); rp.addPeer(ID_ONE); assertNumberOfPeers(2); assertTrue(rp.getPeer(ID_ONE).isPeerEnabled()); @@ -365,7 +374,7 @@ public abstract class TestReplicationStateBasic { // Add peers for the corresponding queues so they are not orphans rp.getPeerStorage().addPeer("qId" + i, ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(), - true); + true, SyncReplicationState.NONE); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java index 3290fb0..1258695 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java @@ -87,8 +87,9 @@ public class TestZKReplicationPeerStorage { Random rand = new Random(seed); return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong())) .setReplicationEndpointImpl(Long.toHexString(rand.nextLong())) - .setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand)) - .setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean()) + .setRemoteWALDir(Long.toHexString(rand.nextLong())).setNamespaces(randNamespaces(rand)) + .setExcludeNamespaces(randNamespaces(rand)).setTableCFsMap(randTableCFs(rand)) + .setExcludeTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean()) .setBandwidth(rand.nextInt(1000)).build(); } @@ -139,7 +140,8 @@ public class TestZKReplicationPeerStorage { public void test() throws ReplicationException { int peerCount = 10; for (int i = 0; i < peerCount; i++) { - STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0); + STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0, + SyncReplicationState.valueOf(i % 4)); } List<String> peerIds = STORAGE.listPeerIds(); assertEquals(peerCount, peerIds.size()); @@ -163,6 +165,10 @@ public class TestZKReplicationPeerStorage { for (int i = 0; i < peerCount; i++) { assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); } + for (int i = 0; i < peerCount; i++) { + assertEquals(SyncReplicationState.valueOf(i % 4), + STORAGE.getPeerSyncReplicationState(Integer.toString(i))); + } String toRemove = Integer.toString(peerCount / 2); STORAGE.removePeer(toRemove); peerIds = STORAGE.listPeerIds(); http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java index a17bc9f..8d2b55f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -1232,6 +1233,28 @@ public interface MasterObserver { String regex) throws IOException {} /** + * Called before transit current cluster state for the specified synchronous replication peer + * @param ctx the environment to interact with the framework and master + * @param peerId a short name that identifies the peer + * @param state a new state + */ + default void preTransitReplicationPeerSyncReplicationState( + final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId, + SyncReplicationState state) throws IOException { + } + + /** + * Called after transit current cluster state for the specified synchronous replication peer + * @param ctx the environment to interact with the framework and master + * @param peerId a short name that identifies the peer + * @param state a new state + */ + default void postTransitReplicationPeerSyncReplicationState( + final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId, + SyncReplicationState state) throws IOException { + } + + /** * Called before new LockProcedure is queued. * @param ctx the environment to interact with the framework and master */ http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index f5bd0de..782193e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -136,6 +136,7 @@ import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure; import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure; import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure; import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.mob.MobConstants; @@ -170,6 +171,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader; @@ -3449,6 +3451,16 @@ public class HMaster extends HRegionServer implements MasterServices { return peers; } + @Override + public long transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state) + throws ReplicationException, IOException { + LOG.info( + getClientIdAuditPrefix() + + " transit current cluster state to {} in a synchronous replication peer id={}", + state, peerId); + return executePeerProcedure(new TransitPeerSyncReplicationStateProcedure(peerId, state)); + } + /** * Mark region server(s) as decommissioned (previously called 'draining') to prevent additional * regions from getting assigned to them. Also unload the regions on the servers asynchronously.0 http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index 8c8c02c..cc008bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -1530,6 +1531,26 @@ public class MasterCoprocessorHost }); } + public void preTransitReplicationPeerSyncReplicationState(final String peerId, + final SyncReplicationState clusterState) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { + @Override + public void call(MasterObserver observer) throws IOException { + observer.preTransitReplicationPeerSyncReplicationState(this, peerId, clusterState); + } + }); + } + + public void postTransitReplicationPeerSyncReplicationState(final String peerId, + final SyncReplicationState clusterState) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { + @Override + public void call(MasterObserver observer) throws IOException { + observer.postTransitReplicationPeerSyncReplicationState(this, peerId, clusterState); + } + }); + } + public void preRequestLock(String namespace, TableName tableName, RegionInfo[] regionInfos, LockType type, String description) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index fa4ddf5..5de5681 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.AccessController; @@ -291,6 +292,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; @@ -1943,6 +1946,20 @@ public class MasterRpcServices extends RSRpcServices } @Override + public TransitReplicationPeerSyncReplicationStateResponse + transitReplicationPeerSyncReplicationState(RpcController controller, + TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException { + try { + long procId = master.transitReplicationPeerSyncReplicationState(request.getPeerId(), + SyncReplicationState.valueOf(request.getSyncReplicationState().getNumber())); + return TransitReplicationPeerSyncReplicationStateResponse.newBuilder().setProcId(procId) + .build(); + } catch (ReplicationException | IOException e) { + throw new ServiceException(e); + } + } + + @Override public ListReplicationPeersResponse listReplicationPeers(RpcController controller, ListReplicationPeersRequest request) throws ServiceException { ListReplicationPeersResponse.Builder response = ListReplicationPeersResponse.newBuilder(); http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 52046c5..76aa2d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -480,6 +481,14 @@ public interface MasterServices extends Server { IOException; /** + * Set current cluster state for a synchronous replication peer. + * @param peerId a short name that identifies the peer + * @param clusterState state of current cluster + */ + long transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState clusterState) + throws ReplicationException, IOException; + + /** * @return {@link LockManager} to lock namespaces/tables/regions. */ LockManager getLockManager(); http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java index 4abc9ad..fc5348e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java @@ -25,7 +25,7 @@ import org.apache.yetus.audience.InterfaceStability; public interface PeerProcedureInterface { enum PeerOperationType { - ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH + ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, TRANSIT_SYNC_REPLICATION_STATE } String getPeerId(); http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 05ecd61..f07a0d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.EnumMap; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -43,6 +45,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -61,6 +64,16 @@ public class ReplicationPeerManager { private final ConcurrentMap<String, ReplicationPeerDescription> peers; + private final EnumMap<SyncReplicationState, EnumSet<SyncReplicationState>> allowedTransition = + new EnumMap<SyncReplicationState, EnumSet<SyncReplicationState>>(SyncReplicationState.class) { + { + put(SyncReplicationState.ACTIVE, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE)); + put(SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE)); + put(SyncReplicationState.DOWNGRADE_ACTIVE, + EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)); + } + }; + ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage, ConcurrentMap<String, ReplicationPeerDescription> peers) { this.peerStorage = peerStorage; @@ -167,6 +180,17 @@ public class ReplicationPeerManager { return desc; } + public void preTransitPeerSyncReplicationState(String peerId, SyncReplicationState state) + throws DoNotRetryIOException { + ReplicationPeerDescription desc = checkPeerExists(peerId); + SyncReplicationState fromState = desc.getSyncReplicationState(); + EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState); + if (allowedToStates == null || !allowedToStates.contains(state)) { + throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState + + " to " + state + " for peer id=" + peerId); + } + } + public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException { if (peers.containsKey(peerId)) { @@ -174,8 +198,12 @@ public class ReplicationPeerManager { return; } ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); - peerStorage.addPeer(peerId, copiedPeerConfig, enabled); - peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig)); + SyncReplicationState syncReplicationState = + StringUtils.isBlank(peerConfig.getRemoteWALDir()) ? SyncReplicationState.NONE + : SyncReplicationState.DOWNGRADE_ACTIVE; + peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState); + peers.put(peerId, + new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState)); } public void removePeer(String peerId) throws ReplicationException { @@ -194,7 +222,8 @@ public class ReplicationPeerManager { return; } peerStorage.setPeerState(peerId, enabled); - peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig())); + peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig(), + desc.getSyncReplicationState())); } public void enablePeer(String peerId) throws ReplicationException { @@ -219,7 +248,8 @@ public class ReplicationPeerManager { newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build(); peerStorage.updatePeerConfig(peerId, newPeerConfig); - peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig)); + peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig, + desc.getSyncReplicationState())); } public List<ReplicationPeerDescription> listPeers(Pattern pattern) { @@ -239,7 +269,15 @@ public class ReplicationPeerManager { queueStorage.removeLastSequenceIds(peerId); } - void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { + public void transitPeerSyncReplicationState(String peerId, SyncReplicationState state) + throws ReplicationException { + ReplicationPeerDescription desc = peers.get(peerId); + peerStorage.setPeerSyncReplicationState(peerId, state); + peers.put(peerId, + new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), state)); + } + + public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still // on-going when the refresh peer config procedure is done, if a RS which has already been // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in @@ -368,7 +406,8 @@ public class ReplicationPeerManager { for (String peerId : peerStorage.listPeerIds()) { ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); boolean enabled = peerStorage.isPeerEnabled(peerId); - peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig)); + SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId); + peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state)); } return new ReplicationPeerManager(peerStorage, ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java new file mode 100644 index 0000000..d26eecc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; + +/** + * The procedure for transit current cluster state for a synchronous replication peer. + */ +@InterfaceAudience.Private +public class TransitPeerSyncReplicationStateProcedure extends ModifyPeerProcedure { + + private static final Logger LOG = + LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class); + + private SyncReplicationState state; + + public TransitPeerSyncReplicationStateProcedure() { + } + + public TransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) { + super(peerId); + this.state = state; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE; + } + + @Override + protected void prePeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preTransitReplicationPeerSyncReplicationState(peerId, state); + } + env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, state); + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, state); + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + LOG.info("Successfully transit current cluster state to {} in synchronous replication peer {}", + state, peerId); + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId, state); + } + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(TransitPeerSyncReplicationStateStateData.newBuilder() + .setSyncReplicationState(ReplicationProtos.SyncReplicationState.forNumber(state.ordinal())) + .build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + TransitPeerSyncReplicationStateStateData data = + serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); + state = SyncReplicationState.valueOf(data.getSyncReplicationState().getNumber()); + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + switch (state) { + case PRE_PEER_MODIFICATION: + try { + prePeerModification(env); + } catch (IOException e) { + LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " + + "mark the procedure as failure and give up", getClass().getName(), peerId, e); + setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e); + releaseLatch(); + return Flow.NO_MORE_STATE; + } catch (ReplicationException e) { + LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(), + peerId, e); + throw new ProcedureYieldException(); + } + setNextState(PeerModificationState.UPDATE_PEER_STORAGE); + return Flow.HAS_MORE_STATE; + case UPDATE_PEER_STORAGE: + try { + updatePeerStorage(env); + } catch (ReplicationException e) { + LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId, + e); + throw new ProcedureYieldException(); + } + setNextState(PeerModificationState.REFRESH_PEER_ON_RS); + return Flow.HAS_MORE_STATE; + case REFRESH_PEER_ON_RS: + // TODO: Need add child procedure for every RegionServer + setNextState(PeerModificationState.POST_PEER_MODIFICATION); + return Flow.HAS_MORE_STATE; + case POST_PEER_MODIFICATION: + try { + postPeerModification(env); + } catch (ReplicationException e) { + LOG.warn("{} failed to call postPeerModification for peer {}, retry", + getClass().getName(), peerId, e); + throw new ProcedureYieldException(); + } catch (IOException e) { + LOG.warn("{} failed to call post CP hook for peer {}, " + + "ignore since the procedure has already done", getClass().getName(), peerId, e); + } + releaseLatch(); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + private void releaseLatch() { + ProcedurePrepareLatch.releaseLatch(latch, this); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index bebf16c..52de755 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; @@ -2505,6 +2506,13 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor, } @Override + public void preTransitReplicationPeerSyncReplicationState( + final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId, + SyncReplicationState clusterState) throws IOException { + requirePermission(ctx, "transitSyncReplicationPeerState", Action.ADMIN); + } + + @Override public void preListReplicationPeers(final ObserverContext<MasterCoprocessorEnvironment> ctx, String regex) throws IOException { requirePermission(ctx, "listReplicationPeers", Action.ADMIN); http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index e471100..a7710e7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest; import org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -983,4 +984,65 @@ public class TestReplicationAdmin { // OK } } + + @Test + public void testTransitSyncReplicationPeerState() throws Exception { + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); + assertEquals(SyncReplicationState.NONE, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE)); + + try { + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE, + SyncReplicationState.DOWNGRADE_ACTIVE); + fail("Can't transit cluster state if replication peer don't config remote wal dir"); + } catch (Exception e) { + // OK + } + + String rootDir = "hdfs://srv1:9999/hbase"; + builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_SECOND); + builder.setRemoteWALDir(rootDir); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + + // Disable and enable peer don't affect SyncReplicationState + hbaseAdmin.disableReplicationPeer(ID_SECOND); + assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + hbaseAdmin.enableReplicationPeer(ID_SECOND); + assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); + assertEquals(SyncReplicationState.ACTIVE, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + + try { + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, + SyncReplicationState.STANDBY); + fail("Can't transit cluster state from ACTIVE to STANDBY"); + } catch (Exception e) { + // OK + } + + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, + SyncReplicationState.DOWNGRADE_ACTIVE); + assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.STANDBY); + assertEquals(SyncReplicationState.STANDBY, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + + try { + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); + fail("Can't transit cluster state from STANDBY to ACTIVE"); + } catch (Exception e) { + // OK + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 0256660..dce062c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -54,11 +54,10 @@ import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import com.google.protobuf.Service; - public class MockNoopMasterServices implements MasterServices { private final Configuration conf; private final MetricsMaster metricsMaster; @@ -486,4 +485,9 @@ public class MockNoopMasterServices implements MasterServices { public boolean isClusterUp() { return true; } + + public long transitReplicationPeerSyncReplicationState(String peerId, + SyncReplicationState clusterState) throws ReplicationException, IOException { + return 0; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index 08dd428..24b930c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -109,7 +110,8 @@ public class TestReplicationHFileCleaner { public void setup() throws ReplicationException, IOException { root = TEST_UTIL.getDataTestDirOnTestFS(); rp.getPeerStorage().addPeer(peerId, - ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true); + ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true, + SyncReplicationState.NONE); rq.addPeerToHFileRefs(peerId); } http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java index 77b4c1b..cbdee7f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java @@ -154,11 +154,13 @@ public class TestReplicationTrackerZKImpl { public void testPeerNameControl() throws Exception { int exists = 0; rp.getPeerStorage().addPeer("6", - ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true); + ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true, + SyncReplicationState.NONE); try { rp.getPeerStorage().addPeer("6", - ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true); + ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true, + SyncReplicationState.NONE); } catch (ReplicationException e) { if (e.getCause() instanceof KeeperException.NodeExistsException) { exists++; http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index eb46cd7..8170893 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -585,7 +586,7 @@ public abstract class TestReplicationSourceManager { private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, final boolean waitForSource) throws Exception { final ReplicationPeers rp = manager.getReplicationPeers(); - rp.getPeerStorage().addPeer(peerId, peerConfig, true); + rp.getPeerStorage().addPeer(peerId, peerConfig, true, SyncReplicationState.NONE); try { manager.addPeer(peerId); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf2719/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index 2e9be30..ac88ff5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -117,6 +117,7 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.Permission.Action; @@ -2939,6 +2940,21 @@ public class TestAccessController extends SecureTestUtil { } @Test + public void testTransitSyncReplicationPeerState() throws Exception { + AccessTestAction action = new AccessTestAction() { + @Override + public Object run() throws Exception { + ACCESS_CONTROLLER.preTransitReplicationPeerSyncReplicationState( + ObserverContextImpl.createAndPrepare(CP_ENV), "test", SyncReplicationState.NONE); + return null; + } + }; + + verifyAllowed(action, SUPERUSER, USER_ADMIN); + verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER); + } + + @Test public void testListReplicationPeers() throws Exception { AccessTestAction action = new AccessTestAction() { @Override