HBASE-19864 Use protobuf instead of enum.ordinal to store SyncReplicationState
Signed-off-by: zhangduo <zhang...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e8a85bfd Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e8a85bfd Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e8a85bfd Branch: refs/heads/HBASE-19064 Commit: e8a85bfd6b7af1d62114acac354c9e3595447190 Parents: 6af83cb Author: Guanghao Zhang <zg...@apache.org> Authored: Fri Jan 26 16:50:48 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Tue Mar 27 17:30:56 2018 +0800 ---------------------------------------------------------------------- .../replication/ReplicationPeerConfigUtil.java | 22 ++++++++++++++--- .../hbase/replication/SyncReplicationState.java | 17 +++++++++++++ .../hbase/shaded/protobuf/RequestConverter.java | 7 +++--- .../src/main/protobuf/Replication.proto | 13 ++++++---- .../replication/ZKReplicationPeerStorage.java | 25 +++++++++----------- .../hadoop/hbase/master/MasterRpcServices.java | 9 ++++--- ...ransitPeerSyncReplicationStateProcedure.java | 9 ++++--- .../TestReplicationSourceManager.java | 2 +- 8 files changed, 67 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e8a85bfd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 6cbe05b..331795c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -403,7 +403,7 @@ public final class ReplicationPeerConfigUtil { ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState(); ReplicationPeerConfig config = convert(desc.getConfig()); return new ReplicationPeerDescription(desc.getId(), enabled, config, - SyncReplicationState.valueOf(desc.getSyncReplicationState().getNumber())); + toSyncReplicationState(desc.getSyncReplicationState())); } public static ReplicationProtos.ReplicationPeerDescription @@ -411,17 +411,33 @@ public final class ReplicationPeerConfigUtil { ReplicationProtos.ReplicationPeerDescription.Builder builder = ReplicationProtos.ReplicationPeerDescription.newBuilder(); builder.setId(desc.getPeerId()); + ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState.newBuilder(); stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED : ReplicationProtos.ReplicationState.State.DISABLED); builder.setState(stateBuilder.build()); + builder.setConfig(convert(desc.getPeerConfig())); - builder.setSyncReplicationState( - ReplicationProtos.SyncReplicationState.forNumber(desc.getSyncReplicationState().ordinal())); + builder.setSyncReplicationState(toSyncReplicationState(desc.getSyncReplicationState())); + return builder.build(); } + public static ReplicationProtos.SyncReplicationState + toSyncReplicationState(SyncReplicationState state) { + ReplicationProtos.SyncReplicationState.Builder syncReplicationStateBuilder = + ReplicationProtos.SyncReplicationState.newBuilder(); + syncReplicationStateBuilder + .setState(ReplicationProtos.SyncReplicationState.State.forNumber(state.ordinal())); + return syncReplicationStateBuilder.build(); + } + + public static SyncReplicationState + toSyncReplicationState(ReplicationProtos.SyncReplicationState state) { + return SyncReplicationState.valueOf(state.getState().getNumber()); + } + public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig( Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) { ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); http://git-wip-us.apache.org/repos/asf/hbase/blob/e8a85bfd/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java index bd144e9..a65b144 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java @@ -17,8 +17,15 @@ */ package org.apache.hadoop.hbase.replication; +import java.util.Arrays; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; + /** * Used by synchronous replication. Indicate the state of the current cluster in a synchronous * replication peer. The state may be one of {@link SyncReplicationState#ACTIVE}, @@ -45,4 +52,14 @@ public enum SyncReplicationState { throw new IllegalArgumentException("Unknown synchronous replication state " + value); } } + + public static byte[] toByteArray(SyncReplicationState state) { + return ProtobufUtil + .prependPBMagic(ReplicationPeerConfigUtil.toSyncReplicationState(state).toByteArray()); + } + + public static SyncReplicationState parseFrom(byte[] bytes) throws InvalidProtocolBufferException { + return ReplicationPeerConfigUtil.toSyncReplicationState(ReplicationProtos.SyncReplicationState + .parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), bytes.length))); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/e8a85bfd/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 4e67b78..8b7bc38 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -1871,10 +1871,9 @@ public final class RequestConverter { } public static TransitReplicationPeerSyncReplicationStateRequest - buildTransitReplicationPeerSyncReplicationStateRequest(String peerId, - SyncReplicationState state) { + buildTransitReplicationPeerSyncReplicationStateRequest(String peerId, + SyncReplicationState state) { return TransitReplicationPeerSyncReplicationStateRequest.newBuilder().setPeerId(peerId) - .setSyncReplicationState(ReplicationProtos.SyncReplicationState.forNumber(state.ordinal())) - .build(); + .setSyncReplicationState(ReplicationPeerConfigUtil.toSyncReplicationState(state)).build(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/e8a85bfd/hbase-protocol-shaded/src/main/protobuf/Replication.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 3564ae4..61ba131 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -66,11 +66,14 @@ message ReplicationState { /** * Indicate the state of the current cluster in a synchronous replication peer. */ -enum SyncReplicationState { - NONE = 0; - ACTIVE = 1; - DOWNGRADE_ACTIVE = 2; - STANDBY = 3; +message SyncReplicationState { + enum State { + NONE = 0; + ACTIVE = 1; + DOWNGRADE_ACTIVE = 2; + STANDBY = 3; + } + required State state = 1; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/e8a85bfd/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java index 338ce3f..909daa0 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -17,12 +17,12 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.IOException; import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; /** @@ -96,7 +95,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId), enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES), ZKUtilOp.createAndFailSilent(getSyncReplicationStateNode(peerId), - Bytes.toBytes(syncReplicationState.ordinal()))); + SyncReplicationState.toByteArray(syncReplicationState))); try { ZKUtil.createWithParents(zookeeper, peersZNode); ZKUtil.multiOrSequential(zookeeper, multiOps, false); @@ -179,29 +178,27 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase } @Override - public void setPeerSyncReplicationState(String peerId, SyncReplicationState clusterState) + public void setPeerSyncReplicationState(String peerId, SyncReplicationState state) throws ReplicationException { - byte[] clusterStateBytes = Bytes.toBytes(clusterState.ordinal()); try { - ZKUtil.setData(zookeeper, getSyncReplicationStateNode(peerId), clusterStateBytes); + ZKUtil.setData(zookeeper, getSyncReplicationStateNode(peerId), + SyncReplicationState.toByteArray(state)); } catch (KeeperException e) { throw new ReplicationException( - "Unable to change the cluster state for the synchronous replication peer with id=" + - peerId, - e); + "Unable to change the cluster state for the synchronous replication peer with id=" + peerId, + e); } } @Override public SyncReplicationState getPeerSyncReplicationState(String peerId) throws ReplicationException { - byte[] data; try { - data = ZKUtil.getData(zookeeper, getSyncReplicationStateNode(peerId)); - } catch (KeeperException | InterruptedException e) { + byte[] data = ZKUtil.getData(zookeeper, getSyncReplicationStateNode(peerId)); + return SyncReplicationState.parseFrom(data); + } catch (KeeperException | InterruptedException | IOException e) { throw new ReplicationException( - "Error getting cluster state for the synchronous replication peer with id=" + peerId, e); + "Error getting cluster state for the synchronous replication peer with id=" + peerId, e); } - return SyncReplicationState.valueOf(Bytes.toInt(data)); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/e8a85bfd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 5de5681..b502c16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -85,7 +85,6 @@ import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.AccessController; @@ -1947,13 +1946,13 @@ public class MasterRpcServices extends RSRpcServices @Override public TransitReplicationPeerSyncReplicationStateResponse - transitReplicationPeerSyncReplicationState(RpcController controller, - TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException { + transitReplicationPeerSyncReplicationState(RpcController controller, + TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException { try { long procId = master.transitReplicationPeerSyncReplicationState(request.getPeerId(), - SyncReplicationState.valueOf(request.getSyncReplicationState().getNumber())); + ReplicationPeerConfigUtil.toSyncReplicationState(request.getSyncReplicationState())); return TransitReplicationPeerSyncReplicationStateResponse.newBuilder().setProcId(procId) - .build(); + .build(); } catch (ReplicationException | IOException e) { throw new ServiceException(e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/e8a85bfd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index d26eecc..aad3b06 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; @@ -33,7 +34,6 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; /** * The procedure for transit current cluster state for a synchronous replication peer. @@ -89,16 +89,15 @@ public class TransitPeerSyncReplicationStateProcedure extends ModifyPeerProcedur protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { super.serializeStateData(serializer); serializer.serialize(TransitPeerSyncReplicationStateStateData.newBuilder() - .setSyncReplicationState(ReplicationProtos.SyncReplicationState.forNumber(state.ordinal())) - .build()); + .setSyncReplicationState(ReplicationPeerConfigUtil.toSyncReplicationState(state)).build()); } @Override protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { super.deserializeStateData(serializer); TransitPeerSyncReplicationStateStateData data = - serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); - state = SyncReplicationState.valueOf(data.getSyncReplicationState().getNumber()); + serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); + state = ReplicationPeerConfigUtil.toSyncReplicationState(data.getSyncReplicationState()); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/e8a85bfd/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 04c7aad..b058da3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -175,7 +175,7 @@ public abstract class TestReplicationSourceManager { ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/sync-rep-state"); ZKUtil.setData(zkw, "/hbase/replication/peers/1/sync-rep-state", - Bytes.toBytes(SyncReplicationState.NONE.ordinal())); + SyncReplicationState.toByteArray(SyncReplicationState.NONE)); ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);