HBASE-19973 Implement a procedure to replay sync replication wal for standby cluster
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d5858f6b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d5858f6b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d5858f6b Branch: refs/heads/HBASE-19064 Commit: d5858f6bbfec890b1879dbf49d4d5f7750bf5d13 Parents: 4bb4d52 Author: Guanghao Zhang <zg...@apache.org> Authored: Fri Mar 2 18:43:25 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed Apr 11 14:56:43 2018 +0800 ---------------------------------------------------------------------- .../src/main/protobuf/MasterProcedure.proto | 22 +++ .../apache/hadoop/hbase/executor/EventType.java | 9 +- .../hadoop/hbase/executor/ExecutorType.java | 3 +- .../org/apache/hadoop/hbase/master/HMaster.java | 9 + .../hadoop/hbase/master/MasterServices.java | 6 + .../procedure/PeerProcedureInterface.java | 3 +- .../hbase/master/procedure/PeerQueue.java | 3 +- .../replication/RecoverStandbyProcedure.java | 114 +++++++++++ .../ReplaySyncReplicationWALManager.java | 139 +++++++++++++ .../ReplaySyncReplicationWALProcedure.java | 193 +++++++++++++++++++ .../hbase/regionserver/HRegionServer.java | 9 +- .../ReplaySyncReplicationWALCallable.java | 149 ++++++++++++++ .../SyncReplicationPeerInfoProviderImpl.java | 3 + .../org/apache/hadoop/hbase/util/FSUtils.java | 5 + .../hbase/master/MockNoopMasterServices.java | 8 +- .../master/TestRecoverStandbyProcedure.java | 186 ++++++++++++++++++ 16 files changed, 854 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index bd4b320..24ffc78 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -454,3 +454,25 @@ message TransitPeerSyncReplicationStateStateData { optional SyncReplicationState fromState = 1; required SyncReplicationState toState = 2; } + +enum RecoverStandbyState { + RENAME_SYNC_REPLICATION_WALS_DIR = 1; + INIT_WORKERS = 2; + DISPATCH_TASKS = 3; + REMOVE_SYNC_REPLICATION_WALS_DIR = 4; +} + +message RecoverStandbyStateData { + required string peer_id = 1; +} + +message ReplaySyncReplicationWALStateData { + required string peer_id = 1; + required string wal = 2; + optional ServerName target_server = 3; +} + +message ReplaySyncReplicationWALParameter { + required string peer_id = 1; + required string wal = 2; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index 922deb8..ad38d1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -281,7 +281,14 @@ public enum EventType { * * RS_REFRESH_PEER */ - RS_REFRESH_PEER (84, ExecutorType.RS_REFRESH_PEER); + RS_REFRESH_PEER(84, ExecutorType.RS_REFRESH_PEER), + + /** + * RS replay sync replication wal.<br> + * + * RS_REPLAY_SYNC_REPLICATION_WAL + */ + RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL); private final int code; private final ExecutorType executor; http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 7f130d1..ea97354 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -47,7 +47,8 @@ public enum ExecutorType { RS_REGION_REPLICA_FLUSH_OPS (28), RS_COMPACTED_FILES_DISCHARGER (29), RS_OPEN_PRIORITY_REGION (30), - RS_REFRESH_PEER (31); + RS_REFRESH_PEER(31), + RS_REPLAY_SYNC_REPLICATION_WAL(32); ExecutorType(int value) { } http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 2809efa..4d3310c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -135,6 +135,7 @@ import org.apache.hadoop.hbase.master.replication.AddPeerProcedure; import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure; import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; +import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure; import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; @@ -339,6 +340,8 @@ public class HMaster extends HRegionServer implements MasterServices { // manager of replication private ReplicationPeerManager replicationPeerManager; + private ReplaySyncReplicationWALManager replaySyncReplicationWALManager; + // buffer for "fatal error" notices from region servers // in the cluster. This is only used for assisting // operations/debugging. @@ -828,6 +831,7 @@ public class HMaster extends HRegionServer implements MasterServices { initializeMemStoreChunkCreator(); this.fileSystemManager = new MasterFileSystem(conf); this.walManager = new MasterWalManager(this); + this.replaySyncReplicationWALManager = new ReplaySyncReplicationWALManager(this); // enable table descriptors cache this.tableDescriptors.setCacheOn(); @@ -3654,4 +3658,9 @@ public class HMaster extends HRegionServer implements MasterServices { public SnapshotQuotaObserverChore getSnapshotQuotaObserverChore() { return this.snapshotQuotaChore; } + + @Override + public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() { + return this.replaySyncReplicationWALManager; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 76aa2d6..c5b9200 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; @@ -465,6 +466,11 @@ public interface MasterServices extends Server { ReplicationPeerManager getReplicationPeerManager(); /** + * Returns the {@link ReplaySyncReplicationWALManager}. + */ + ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager(); + + /** * Update the peerConfig for the specified peer * @param peerId a short name that identifies the peer * @param peerConfig new config for the peer http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java index fc5348e..8ea49a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java @@ -25,7 +25,8 @@ import org.apache.yetus.audience.InterfaceStability; public interface PeerProcedureInterface { enum PeerOperationType { - ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, TRANSIT_SYNC_REPLICATION_STATE + ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, TRANSIT_SYNC_REPLICATION_STATE, + RECOVER_STANDBY, REPLAY_SYNC_REPLICATION_WAL } String getPeerId(); http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java index 1ae0c2f..25feb7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java @@ -49,6 +49,7 @@ class PeerQueue extends Queue<String> { } private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) { - return proc.getPeerOperationType() != PeerOperationType.REFRESH; + return proc.getPeerOperationType() != PeerOperationType.REFRESH + && proc.getPeerOperationType() != PeerOperationType.REPLAY_SYNC_REPLICATION_WAL; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java new file mode 100644 index 0000000..e9e3a97 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState; + +@InterfaceAudience.Private +public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandbyState> { + + private static final Logger LOG = LoggerFactory.getLogger(RecoverStandbyProcedure.class); + + public RecoverStandbyProcedure() { + } + + public RecoverStandbyProcedure(String peerId) { + super(peerId); + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, RecoverStandbyState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + ReplaySyncReplicationWALManager replaySyncReplicationWALManager = + env.getMasterServices().getReplaySyncReplicationWALManager(); + switch (state) { + case RENAME_SYNC_REPLICATION_WALS_DIR: + try { + replaySyncReplicationWALManager.renamePeerRemoteWALDir(peerId); + } catch (IOException e) { + LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, e); + setFailure("master-recover-standby", e); + return Flow.NO_MORE_STATE; + } + setNextState(RecoverStandbyState.INIT_WORKERS); + return Flow.HAS_MORE_STATE; + case INIT_WORKERS: + replaySyncReplicationWALManager.initPeerWorkers(peerId); + setNextState(RecoverStandbyState.DISPATCH_TASKS); + return Flow.HAS_MORE_STATE; + case DISPATCH_TASKS: + addChildProcedure(getReplayWALs(replaySyncReplicationWALManager).stream() + .map(wal -> new ReplaySyncReplicationWALProcedure(peerId, + replaySyncReplicationWALManager.removeWALRootPath(wal))) + .toArray(ReplaySyncReplicationWALProcedure[]::new)); + setNextState(RecoverStandbyState.REMOVE_SYNC_REPLICATION_WALS_DIR); + return Flow.HAS_MORE_STATE; + case REMOVE_SYNC_REPLICATION_WALS_DIR: + try { + replaySyncReplicationWALManager.removePeerReplayWALDir(peerId); + } catch (IOException e) { + LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e); + throw new ProcedureYieldException(); + } + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + private List<Path> getReplayWALs(ReplaySyncReplicationWALManager replaySyncReplicationWALManager) + throws ProcedureYieldException { + try { + return replaySyncReplicationWALManager.getReplayWALs(peerId); + } catch (IOException e) { + LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e); + throw new ProcedureYieldException(); + } + } + + @Override + protected RecoverStandbyState getState(int stateId) { + return RecoverStandbyState.forNumber(stateId); + } + + @Override + protected int getStateId(RecoverStandbyState state) { + return state.getNumber(); + } + + @Override + protected RecoverStandbyState getInitialState() { + return RecoverStandbyState.RENAME_SYNC_REPLICATION_WALS_DIR; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.RECOVER_STANDBY; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java new file mode 100644 index 0000000..72f5c37 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class ReplaySyncReplicationWALManager { + + private static final Logger LOG = + LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class); + + private static final String REPLAY_SUFFIX = "-replay"; + + private final MasterServices services; + + private final Configuration conf; + + private final FileSystem fs; + + private final Path walRootDir; + + private final Path remoteWALDir; + + private final Map<String, BlockingQueue<ServerName>> availServers = new HashMap<>(); + + public ReplaySyncReplicationWALManager(MasterServices services) { + this.services = services; + this.conf = services.getConfiguration(); + this.fs = services.getMasterFileSystem().getWALFileSystem(); + this.walRootDir = services.getMasterFileSystem().getWALRootDir(); + this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); + } + + public Path getPeerRemoteWALDir(String peerId) { + return new Path(this.remoteWALDir, peerId); + } + + private Path getPeerReplayWALDir(String peerId) { + return getPeerRemoteWALDir(peerId).suffix(REPLAY_SUFFIX); + } + + public void createPeerRemoteWALDir(String peerId) throws IOException { + Path peerRemoteWALDir = getPeerRemoteWALDir(peerId); + if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) { + throw new IOException("Unable to mkdir " + peerRemoteWALDir); + } + } + + public void renamePeerRemoteWALDir(String peerId) throws IOException { + Path peerRemoteWALDir = getPeerRemoteWALDir(peerId); + Path peerReplayWALDir = peerRemoteWALDir.suffix(REPLAY_SUFFIX); + if (fs.exists(peerRemoteWALDir)) { + if (!fs.rename(peerRemoteWALDir, peerReplayWALDir)) { + throw new IOException("Failed rename remote wal dir from " + peerRemoteWALDir + " to " + + peerReplayWALDir + " for peer id=" + peerId); + } + LOG.info("Rename remote wal dir from {} to {} for peer id={}", remoteWALDir, peerReplayWALDir, + peerId); + } else if (!fs.exists(peerReplayWALDir)) { + throw new IOException("Remote wal dir " + peerRemoteWALDir + " and replay wal dir " + + peerReplayWALDir + " not exist for peer id=" + peerId); + } + } + + public List<Path> getReplayWALs(String peerId) throws IOException { + Path peerReplayWALDir = getPeerReplayWALDir(peerId); + List<Path> replayWals = new ArrayList<>(); + RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(peerReplayWALDir, false); + while (iterator.hasNext()) { + replayWals.add(iterator.next().getPath()); + } + return replayWals; + } + + public void removePeerReplayWALDir(String peerId) throws IOException { + Path peerReplayWALDir = getPeerReplayWALDir(peerId); + if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) { + throw new IOException( + "Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId); + } + } + + public void initPeerWorkers(String peerId) { + BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>(); + services.getServerManager().getOnlineServers().keySet() + .forEach(server -> servers.offer(server)); + availServers.put(peerId, servers); + } + + public ServerName getAvailServer(String peerId, long timeout, TimeUnit unit) + throws InterruptedException { + return availServers.get(peerId).poll(timeout, unit); + } + + public void addAvailServer(String peerId, ServerName server) { + availServers.get(peerId).offer(server); + } + + public String removeWALRootPath(Path path) { + String pathStr = path.toString(); + // remove the "/" too. + return pathStr.substring(walRootDir.toString().length() + 1); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java new file mode 100644 index 0000000..8d8a65a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; +import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALStateData; + +@InterfaceAudience.Private +public class ReplaySyncReplicationWALProcedure extends Procedure<MasterProcedureEnv> + implements RemoteProcedure<MasterProcedureEnv, ServerName>, PeerProcedureInterface { + + private static final Logger LOG = + LoggerFactory.getLogger(ReplaySyncReplicationWALProcedure.class); + + private static final long DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT = 10000; + + private String peerId; + + private ServerName targetServer = null; + + private String wal; + + private boolean dispatched; + + private ProcedureEvent<?> event; + + private boolean succ; + + public ReplaySyncReplicationWALProcedure() { + } + + public ReplaySyncReplicationWALProcedure(String peerId, String wal) { + this.peerId = peerId; + this.wal = wal; + } + + @Override + public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) { + return new ServerOperation(this, getProcId(), ReplaySyncReplicationWALCallable.class, + ReplaySyncReplicationWALParameter.newBuilder().setPeerId(peerId).setWal(wal).build() + .toByteArray()); + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) { + complete(env, exception); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + complete(env, error); + } + + private void complete(MasterProcedureEnv env, Throwable error) { + if (event == null) { + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", + getProcId()); + return; + } + ReplaySyncReplicationWALManager replaySyncReplicationWALManager = + env.getMasterServices().getReplaySyncReplicationWALManager(); + if (error != null) { + LOG.warn("Replay sync replication wal {} on {} failed for peer id={}", wal, targetServer, + peerId, error); + this.succ = false; + } else { + LOG.warn("Replay sync replication wal {} on {} suceeded for peer id={}", wal, targetServer, + peerId); + this.succ = true; + replaySyncReplicationWALManager.addAvailServer(peerId, targetServer); + } + event.wake(env.getProcedureScheduler()); + event = null; + } + + @Override + protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + // retry + dispatched = false; + } + + // Try poll a available server + if (targetServer == null) { + targetServer = env.getMasterServices().getReplaySyncReplicationWALManager() + .getAvailServer(peerId, DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT, TimeUnit.MILLISECONDS); + if (targetServer == null) { + LOG.info("No available server to replay wal {} for peer id={}, retry", wal, peerId); + throw new ProcedureYieldException(); + } + } + + // Dispatch task to target server + if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) { + LOG.info( + "Can not add remote operation for replay wal {} on {} for peer id={}, " + + "this usually because the server is already dead, " + "retry", + wal, targetServer, peerId); + targetServer = null; + throw new ProcedureYieldException(); + } + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + ReplaySyncReplicationWALStateData.Builder builder = + ReplaySyncReplicationWALStateData.newBuilder().setPeerId(peerId).setWal(wal); + if (targetServer != null) { + builder.setTargetServer(ProtobufUtil.toServerName(targetServer)); + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + ReplaySyncReplicationWALStateData data = + serializer.deserialize(ReplaySyncReplicationWALStateData.class); + peerId = data.getPeerId(); + wal = data.getWal(); + if (data.hasTargetServer()) { + targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + } + } + + @Override + public String getPeerId() { + return peerId; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.REPLAY_SYNC_REPLICATION_WAL; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index af7b1e8..f8e2105 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1927,6 +1927,11 @@ public class HRegionServer extends HasThread implements this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER, conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2)); + if (conf.getBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, false)) { + this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL, + conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 2)); + } + Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); this.cacheFlusher.start(uncaughtExceptionHandler); @@ -2874,14 +2879,14 @@ public class HRegionServer extends HasThread implements /** * @return Return the walRootDir. */ - protected Path getWALRootDir() { + public Path getWALRootDir() { return walRootDir; } /** * @return Return the walFs. */ - protected FileSystem getWALFileSystem() { + public FileSystem getWALFileSystem() { return walFs; } http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java new file mode 100644 index 0000000..8dfe3a2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; + +/** + * This callable executed at RS side to replay sync replication wal. + */ +@InterfaceAudience.Private +public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { + + private static final Logger LOG = LoggerFactory.getLogger(ReplaySyncReplicationWALCallable.class); + + private static final String REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE = + "hbase.replay.sync.replication.wal.batch.size"; + + private static final long DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE = 8 * 1024 * 1024; + + private HRegionServer rs; + + private FileSystem fs; + + private Configuration conf; + + private String peerId; + + private String wal; + + private Exception initError; + + private long batchSize; + + @Override + public Void call() throws Exception { + if (initError != null) { + throw initError; + } + LOG.info("Received a replay sync replication wal {} event, peerId={}", wal, peerId); + try (Reader reader = getReader()) { + List<Entry> entries = readWALEntries(reader); + while (!entries.isEmpty()) { + Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtbufUtil + .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()])); + HBaseRpcController controller = new HBaseRpcControllerImpl(pair.getSecond()); + rs.getRSRpcServices().replicateWALEntry(controller, pair.getFirst()); + entries = readWALEntries(reader); + } + } + return null; + } + + @Override + public void init(byte[] parameter, HRegionServer rs) { + this.rs = rs; + this.fs = rs.getWALFileSystem(); + this.conf = rs.getConfiguration(); + try { + ReplaySyncReplicationWALParameter param = + ReplaySyncReplicationWALParameter.parseFrom(parameter); + this.peerId = param.getPeerId(); + this.wal = param.getWal(); + this.batchSize = rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE, + DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE); + } catch (InvalidProtocolBufferException e) { + initError = e; + } + } + + @Override + public EventType getEventType() { + return EventType.RS_REPLAY_SYNC_REPLICATION_WAL; + } + + private Reader getReader() throws IOException { + Path path = new Path(rs.getWALRootDir(), wal); + long length = rs.getWALFileSystem().getFileStatus(path).getLen(); + try { + FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf); + return WALFactory.createReader(rs.getWALFileSystem(), path, rs.getConfiguration()); + } catch (EOFException e) { + if (length <= 0) { + LOG.warn("File is empty. Could not open {} for reading because {}", path, e); + return null; + } + throw e; + } + } + + private List<Entry> readWALEntries(Reader reader) throws IOException { + List<Entry> entries = new ArrayList<>(); + if (reader == null) { + return entries; + } + long size = 0; + Entry entry = reader.next(); + while (entry != null) { + entries.add(entry); + size += entry.getEdit().heapSize(); + if (size > batchSize) { + break; + } + entry = reader.next(); + } + return entries; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java index 973e049..e4afc33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java @@ -41,6 +41,9 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv @Override public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) { + if (info == null) { + return Optional.empty(); + } String peerId = mapping.getPeerId(info); if (peerId == null) { return Optional.empty(); http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index b106a31..178447e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -937,6 +937,11 @@ public abstract class FSUtils extends CommonFSUtils { } } + public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf) + throws IOException { + recoverFileLease(fs, p, conf, null); + } + /** * Recover file lease. Used when a file might be suspect * to be had been left open by another process. http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index dce062c..60132a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; @@ -490,4 +491,9 @@ public class MockNoopMasterServices implements MasterServices { SyncReplicationState clusterState) throws ReplicationException, IOException { return 0; } -} + + @Override + public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() { + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d5858f6b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java new file mode 100644 index 0000000..817f03d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.master; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure; +import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({MasterTests.class, LargeTests.class}) +public class TestRecoverStandbyProcedure { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRecoverStandbyProcedure.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestRecoverStandbyProcedure.class); + + private static final TableName tableName = TableName.valueOf("TestRecoverStandbyProcedure"); + + private static final RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); + + private static final byte[] family = Bytes.toBytes("CF"); + + private static final byte[] qualifier = Bytes.toBytes("q"); + + private static final long timestamp = System.currentTimeMillis(); + + private static final int ROW_COUNT = 1000; + + private static final int WAL_NUMBER = 10; + + private static final int RS_NUMBER = 3; + + private static final String PEER_ID = "1"; + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static ReplaySyncReplicationWALManager replaySyncReplicationWALManager; + + private static ProcedureExecutor<MasterProcedureEnv> procExec; + + private static FileSystem fs; + + private static Configuration conf; + + @BeforeClass + public static void setupCluster() throws Exception { + UTIL.getConfiguration().setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, true); + UTIL.startMiniCluster(RS_NUMBER); + UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + conf = UTIL.getConfiguration(); + HMaster master = UTIL.getHBaseCluster().getMaster(); + fs = master.getMasterFileSystem().getWALFileSystem(); + replaySyncReplicationWALManager = master.getReplaySyncReplicationWALManager(); + procExec = master.getMasterProcedureExecutor(); + } + + @AfterClass + public static void cleanupTest() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Before + public void setupBeforeTest() throws IOException { + UTIL.createTable(tableName, family); + } + + @After + public void tearDownAfterTest() throws IOException { + try (Admin admin = UTIL.getAdmin()) { + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName); + } + admin.deleteTable(tableName); + } + } + + @Test + public void testRecoverStandby() throws IOException, StreamLacksCapabilityException { + setupSyncReplicationWALs(); + long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID)); + ProcedureTestingUtility.waitProcedure(procExec, procId); + ProcedureTestingUtility.assertProcNotFailed(procExec, procId); + + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < WAL_NUMBER * ROW_COUNT; i++) { + Result result = table.get(new Get(Bytes.toBytes(i)).setTimeStamp(timestamp)); + assertNotNull(result); + assertEquals(i, Bytes.toInt(result.getValue(family, qualifier))); + } + } + } + + private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException { + Path peerRemoteWALDir = replaySyncReplicationWALManager.getPeerRemoteWALDir(PEER_ID); + if (!fs.exists(peerRemoteWALDir)) { + fs.mkdirs(peerRemoteWALDir); + } + for (int i = 0; i < WAL_NUMBER; i++) { + try (ProtobufLogWriter writer = new ProtobufLogWriter()) { + Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep"); + writer.init(fs, wal, conf, true); + List<Entry> entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT); + for (Entry entry : entries) { + writer.append(entry); + } + writer.sync(false); + LOG.info("Created wal {} to replay for peer id={}", wal, PEER_ID); + } + } + } + + private List<Entry> setupWALEntries(int startRow, int endRow) { + return IntStream.range(startRow, endRow) + .mapToObj(i -> createWALEntry(Bytes.toBytes(i), Bytes.toBytes(i))) + .collect(Collectors.toList()); + } + + private Entry createWALEntry(byte[] row, byte[] value) { + WALKeyImpl key = new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, 1); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(row, family, qualifier, timestamp, value)); + return new Entry(key, edit); + } +}