HBASE-20569 NPE in RecoverStandbyProcedure.execute
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/44ca13fe Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/44ca13fe Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/44ca13fe Branch: refs/heads/master Commit: 44ca13fe07dc5050a2bc98ccd3f65953f06aaef8 Parents: 7448b04 Author: Guanghao Zhang <zg...@apache.org> Authored: Thu May 31 20:54:59 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Thu Jun 28 18:08:43 2018 +0800 ---------------------------------------------------------------------- .../src/main/protobuf/MasterProcedure.proto | 26 ++- .../org/apache/hadoop/hbase/master/HMaster.java | 10 +- .../hadoop/hbase/master/MasterServices.java | 6 +- .../procedure/MasterProcedureScheduler.java | 3 +- .../procedure/PeerProcedureInterface.java | 2 +- .../hbase/master/procedure/PeerQueue.java | 3 +- .../replication/RecoverStandbyProcedure.java | 68 ++++-- .../master/replication/RemovePeerProcedure.java | 5 +- .../ReplaySyncReplicationWALManager.java | 169 -------------- .../ReplaySyncReplicationWALProcedure.java | 196 ----------------- .../SyncReplicationReplayWALManager.java | 218 +++++++++++++++++++ .../SyncReplicationReplayWALProcedure.java | 164 ++++++++++++++ ...SyncReplicationReplayWALRemoteProcedure.java | 213 ++++++++++++++++++ ...ransitPeerSyncReplicationStateProcedure.java | 6 +- ...ZKSyncReplicationReplayWALWorkerStorage.java | 108 +++++++++ .../ReplaySyncReplicationWALCallable.java | 46 ++-- .../hbase/master/MockNoopMasterServices.java | 4 +- .../replication/SyncReplicationTestBase.java | 6 +- .../TestSyncReplicationStandbyKillMaster.java | 88 ++++++++ .../TestSyncReplicationStandbyKillRS.java | 119 ++++++++++ .../master/TestRecoverStandbyProcedure.java | 10 +- 21 files changed, 1040 insertions(+), 430 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 23ec8f8..a062e9a 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -486,22 +486,34 @@ message TransitPeerSyncReplicationStateStateData { enum RecoverStandbyState { RENAME_SYNC_REPLICATION_WALS_DIR = 1; - INIT_WORKERS = 2; - DISPATCH_TASKS = 3; - SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 4; + REGISTER_PEER_TO_WORKER_STORAGE = 2; + DISPATCH_WALS = 3; + UNREGISTER_PEER_FROM_WORKER_STORAGE = 4; + SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 5; +} + +enum SyncReplicationReplayWALState { + ASSIGN_WORKER = 1; + DISPATCH_WALS_TO_WORKER = 2; + RELEASE_WORKER = 3; } message RecoverStandbyStateData { + required bool serial = 1; +} + +message SyncReplicationReplayWALStateData { required string peer_id = 1; + repeated string wal = 2; } -message ReplaySyncReplicationWALStateData { +message SyncReplicationReplayWALRemoteStateData { required string peer_id = 1; - required string wal = 2; - optional ServerName target_server = 3; + repeated string wal = 2; + required ServerName target_server = 3; } message ReplaySyncReplicationWALParameter { required string peer_id = 1; - required string wal = 2; + repeated string wal = 2; } http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 2c23e85..dc62752 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -138,8 +138,8 @@ import org.apache.hadoop.hbase.master.replication.AddPeerProcedure; import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure; import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; -import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager; import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure; import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; @@ -343,7 +343,7 @@ public class HMaster extends HRegionServer implements MasterServices { // manager of replication private ReplicationPeerManager replicationPeerManager; - private ReplaySyncReplicationWALManager replaySyncReplicationWALManager; + private SyncReplicationReplayWALManager syncReplicationReplayWALManager; // buffer for "fatal error" notices from region servers // in the cluster. This is only used for assisting @@ -754,6 +754,7 @@ public class HMaster extends HRegionServer implements MasterServices { this.splitOrMergeTracker.start(); this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf); + this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this); this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager); this.drainingServerTracker.start(); @@ -852,7 +853,6 @@ public class HMaster extends HRegionServer implements MasterServices { initializeMemStoreChunkCreator(); this.fileSystemManager = new MasterFileSystem(conf); this.walManager = new MasterWalManager(this); - this.replaySyncReplicationWALManager = new ReplaySyncReplicationWALManager(this); // enable table descriptors cache this.tableDescriptors.setCacheOn(); @@ -3764,7 +3764,7 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() { - return this.replaySyncReplicationWALManager; + public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() { + return this.syncReplicationReplayWALManager; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 6034ff7..7b0c56a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -462,9 +462,9 @@ public interface MasterServices extends Server { ReplicationPeerManager getReplicationPeerManager(); /** - * Returns the {@link ReplaySyncReplicationWALManager}. + * Returns the {@link SyncReplicationReplayWALManager}. */ - ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager(); + SyncReplicationReplayWALManager getSyncReplicationReplayWALManager(); /** * Update the peerConfig for the specified peer http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 1420986..8a28b84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -207,7 +207,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // check if the next procedure is still a child. // if not, remove the rq from the fairq and go back to the xlock state Procedure<?> nextProc = rq.peek(); - if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) { + if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult) + && nextProc.getRootProcId() != pollResult.getRootProcId()) { removeFromRunQueue(fairq, rq); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java index 76b5163..0195ab9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java @@ -24,7 +24,7 @@ public interface PeerProcedureInterface { enum PeerOperationType { ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, TRANSIT_SYNC_REPLICATION_STATE, - RECOVER_STANDBY, REPLAY_SYNC_REPLICATION_WAL + RECOVER_STANDBY, SYNC_REPLICATION_REPLAY_WAL, SYNC_REPLICATION_REPLAY_WAL_REMOTE } String getPeerId(); http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java index 25feb7e..86d8e43 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java @@ -50,6 +50,7 @@ class PeerQueue extends Queue<String> { private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) { return proc.getPeerOperationType() != PeerOperationType.REFRESH - && proc.getPeerOperationType() != PeerOperationType.REPLAY_SYNC_REPLICATION_WAL; + && proc.getPeerOperationType() != PeerOperationType.SYNC_REPLICATION_REPLAY_WAL + && proc.getPeerOperationType() != PeerOperationType.SYNC_REPLICATION_REPLAY_WAL_REMOTE; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java index 9860774..5494742 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java @@ -18,60 +18,79 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyStateData; @InterfaceAudience.Private public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandbyState> { private static final Logger LOG = LoggerFactory.getLogger(RecoverStandbyProcedure.class); + private boolean serial; + public RecoverStandbyProcedure() { } - public RecoverStandbyProcedure(String peerId) { + public RecoverStandbyProcedure(String peerId, boolean serial) { super(peerId); + this.serial = serial; } @Override protected Flow executeFromState(MasterProcedureEnv env, RecoverStandbyState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { - ReplaySyncReplicationWALManager replaySyncReplicationWALManager = - env.getMasterServices().getReplaySyncReplicationWALManager(); + SyncReplicationReplayWALManager syncReplicationReplayWALManager = + env.getMasterServices().getSyncReplicationReplayWALManager(); switch (state) { case RENAME_SYNC_REPLICATION_WALS_DIR: try { - replaySyncReplicationWALManager.renameToPeerReplayWALDir(peerId); + syncReplicationReplayWALManager.renameToPeerReplayWALDir(peerId); } catch (IOException e) { LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, e); setFailure("master-recover-standby", e); return Flow.NO_MORE_STATE; } - setNextState(RecoverStandbyState.INIT_WORKERS); + setNextState(RecoverStandbyState.REGISTER_PEER_TO_WORKER_STORAGE); return Flow.HAS_MORE_STATE; - case INIT_WORKERS: - replaySyncReplicationWALManager.initPeerWorkers(peerId); - setNextState(RecoverStandbyState.DISPATCH_TASKS); + case REGISTER_PEER_TO_WORKER_STORAGE: + try { + syncReplicationReplayWALManager.registerPeer(peerId); + } catch (ReplicationException e) { + LOG.warn("Failed to register peer to worker storage for peer id={}, retry", peerId, e); + throw new ProcedureYieldException(); + } + setNextState(RecoverStandbyState.DISPATCH_WALS); return Flow.HAS_MORE_STATE; - case DISPATCH_TASKS: - addChildProcedure(getReplayWALs(replaySyncReplicationWALManager).stream() - .map(wal -> new ReplaySyncReplicationWALProcedure(peerId, - replaySyncReplicationWALManager.removeWALRootPath(wal))) - .toArray(ReplaySyncReplicationWALProcedure[]::new)); + case DISPATCH_WALS: + dispathWals(syncReplicationReplayWALManager); + setNextState(RecoverStandbyState.UNREGISTER_PEER_FROM_WORKER_STORAGE); + return Flow.HAS_MORE_STATE; + case UNREGISTER_PEER_FROM_WORKER_STORAGE: + try { + syncReplicationReplayWALManager.unregisterPeer(peerId); + } catch (ReplicationException e) { + LOG.warn("Failed to unregister peer from worker storage for peer id={}, retry", peerId, + e); + throw new ProcedureYieldException(); + } setNextState(RecoverStandbyState.SNAPSHOT_SYNC_REPLICATION_WALS_DIR); return Flow.HAS_MORE_STATE; case SNAPSHOT_SYNC_REPLICATION_WALS_DIR: try { - replaySyncReplicationWALManager.renameToPeerSnapshotWALDir(peerId); + syncReplicationReplayWALManager.renameToPeerSnapshotWALDir(peerId); } catch (IOException e) { LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e); throw new ProcedureYieldException(); @@ -82,10 +101,14 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb } } - private List<Path> getReplayWALs(ReplaySyncReplicationWALManager replaySyncReplicationWALManager) + // TODO: dispatch wals by region server when serial is true and sort wals + private void dispathWals(SyncReplicationReplayWALManager syncReplicationReplayWALManager) throws ProcedureYieldException { try { - return replaySyncReplicationWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId); + List<Path> wals = syncReplicationReplayWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId); + addChildProcedure(wals.stream().map(wal -> new SyncReplicationReplayWALProcedure(peerId, + Arrays.asList(syncReplicationReplayWALManager.removeWALRootPath(wal)))) + .toArray(SyncReplicationReplayWALProcedure[]::new)); } catch (IOException e) { LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e); throw new ProcedureYieldException(); @@ -111,4 +134,17 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb public PeerOperationType getPeerOperationType() { return PeerOperationType.RECOVER_STANDBY; } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(RecoverStandbyStateData.newBuilder().setSerial(serial).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + RecoverStandbyStateData data = serializer.deserialize(RecoverStandbyStateData.class); + serial = data.getSerial(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java index 254448a..4b77c8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java @@ -67,11 +67,10 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { } private void removeRemoteWALs(MasterProcedureEnv env) throws IOException { - env.getMasterServices().getReplaySyncReplicationWALManager().removePeerRemoteWALs(peerId); + env.getMasterServices().getSyncReplicationReplayWALManager().removePeerRemoteWALs(peerId); } - @Override - protected void postPeerModification(MasterProcedureEnv env) + @Override protected void postPeerModification(MasterProcedureEnv env) throws IOException, ReplicationException { if (peerConfig.isSyncReplication()) { removeRemoteWALs(env); http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java deleted file mode 100644 index 348c134..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.replication; - -import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerRemoteWALDir; -import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir; -import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapshotWALDir; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.replication.ReplicationUtils; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - -@InterfaceAudience.Private -public class ReplaySyncReplicationWALManager { - - private static final Logger LOG = - LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class); - - private final MasterServices services; - - private final FileSystem fs; - - private final Path walRootDir; - - private final Path remoteWALDir; - - private final Map<String, BlockingQueue<ServerName>> availServers = new HashMap<>(); - - public ReplaySyncReplicationWALManager(MasterServices services) { - this.services = services; - this.fs = services.getMasterFileSystem().getWALFileSystem(); - this.walRootDir = services.getMasterFileSystem().getWALRootDir(); - this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); - } - - public void createPeerRemoteWALDir(String peerId) throws IOException { - Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId); - if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) { - throw new IOException("Unable to mkdir " + peerRemoteWALDir); - } - } - - private void rename(Path src, Path dst, String peerId) throws IOException { - if (fs.exists(src)) { - deleteDir(dst, peerId); - if (!fs.rename(src, dst)) { - throw new IOException( - "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId); - } - LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId); - } else if (!fs.exists(dst)) { - throw new IOException( - "Want to rename from " + src + " to " + dst + ", but they both do not exist"); - } - } - - public void renameToPeerReplayWALDir(String peerId) throws IOException { - rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId), - peerId); - } - - public void renameToPeerSnapshotWALDir(String peerId) throws IOException { - rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId), - peerId); - } - - public List<Path> getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException { - Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId); - for (FileStatus status : fs.listStatus(peerReplayWALDir, - p -> p.getName().endsWith(ReplicationUtils.RENAME_WAL_SUFFIX))) { - Path src = status.getPath(); - String srcName = src.getName(); - String dstName = - srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length()); - FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName)); - } - List<Path> wals = new ArrayList<>(); - for (FileStatus status : fs.listStatus(peerReplayWALDir)) { - Path path = status.getPath(); - if (path.getName().endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) { - wals.add(path); - } else { - if (!fs.delete(path, true)) { - LOG.warn("Can not delete unused file: " + path); - } - } - } - return wals; - } - - public void snapshotPeerReplayWALDir(String peerId) throws IOException { - Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId); - if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) { - throw new IOException( - "Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId); - } - } - - private void deleteDir(Path dir, String peerId) throws IOException { - if (!fs.delete(dir, true) && fs.exists(dir)) { - throw new IOException("Failed to remove dir " + dir + " for peer id=" + peerId); - } - } - - public void removePeerRemoteWALs(String peerId) throws IOException { - deleteDir(getPeerRemoteWALDir(remoteWALDir, peerId), peerId); - deleteDir(getPeerReplayWALDir(remoteWALDir, peerId), peerId); - deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId); - } - - public void initPeerWorkers(String peerId) { - BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>(); - services.getServerManager().getOnlineServers().keySet() - .forEach(server -> servers.offer(server)); - availServers.put(peerId, servers); - } - - public ServerName getAvailServer(String peerId, long timeout, TimeUnit unit) - throws InterruptedException { - return availServers.get(peerId).poll(timeout, unit); - } - - public void addAvailServer(String peerId, ServerName server) { - availServers.get(peerId).offer(server); - } - - public String removeWALRootPath(Path path) { - String pathStr = path.toString(); - // remove the "/" too. - return pathStr.substring(walRootDir.toString().length() + 1); - } - - @VisibleForTesting - public Path getRemoteWALDir() { - return remoteWALDir; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java deleted file mode 100644 index 77fd24d..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.replication; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; -import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; -import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureEvent; -import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; -import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; -import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; -import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; -import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; -import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; -import org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALStateData; - -@InterfaceAudience.Private -public class ReplaySyncReplicationWALProcedure extends Procedure<MasterProcedureEnv> - implements RemoteProcedure<MasterProcedureEnv, ServerName>, PeerProcedureInterface { - - private static final Logger LOG = - LoggerFactory.getLogger(ReplaySyncReplicationWALProcedure.class); - - private static final long DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT = 10000; - - private String peerId; - - private ServerName targetServer = null; - - private String wal; - - private boolean dispatched; - - private ProcedureEvent<?> event; - - private boolean succ; - - public ReplaySyncReplicationWALProcedure() { - } - - public ReplaySyncReplicationWALProcedure(String peerId, String wal) { - this.peerId = peerId; - this.wal = wal; - } - - @Override - public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) { - return new ServerOperation(this, getProcId(), ReplaySyncReplicationWALCallable.class, - ReplaySyncReplicationWALParameter.newBuilder().setPeerId(peerId).setWal(wal).build() - .toByteArray()); - } - - @Override - public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) { - complete(env, exception); - } - - @Override - public void remoteOperationCompleted(MasterProcedureEnv env) { - complete(env, null); - } - - @Override - public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { - complete(env, error); - } - - private void complete(MasterProcedureEnv env, Throwable error) { - if (event == null) { - LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", - getProcId()); - return; - } - ReplaySyncReplicationWALManager replaySyncReplicationWALManager = - env.getMasterServices().getReplaySyncReplicationWALManager(); - if (error != null) { - LOG.warn("Replay sync replication wal {} on {} failed for peer id={}", wal, targetServer, - peerId, error); - this.succ = false; - } else { - LOG.warn("Replay sync replication wal {} on {} suceeded for peer id={}", wal, targetServer, - peerId); - this.succ = true; - replaySyncReplicationWALManager.addAvailServer(peerId, targetServer); - } - event.wake(env.getProcedureScheduler()); - event = null; - } - - @Override - protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) - throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { - if (dispatched) { - if (succ) { - return null; - } - // retry - dispatched = false; - } - - // Try poll a available server - if (targetServer == null) { - targetServer = env.getMasterServices().getReplaySyncReplicationWALManager() - .getAvailServer(peerId, DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT, TimeUnit.MILLISECONDS); - if (targetServer == null) { - LOG.info("No available server to replay wal {} for peer id={}, retry", wal, peerId); - throw new ProcedureYieldException(); - } - } - - // Dispatch task to target server - try { - env.getRemoteDispatcher().addOperationToNode(targetServer, this); - } catch (FailedRemoteDispatchException e) { - LOG.info( - "Can not add remote operation for replay wal {} on {} for peer id={}, " + - "this usually because the server is already dead, " + "retry", - wal, targetServer, peerId, e); - targetServer = null; - throw new ProcedureYieldException(); - } - dispatched = true; - event = new ProcedureEvent<>(this); - event.suspendIfNotReady(this); - throw new ProcedureSuspendedException(); - } - - @Override - protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - protected boolean abort(MasterProcedureEnv env) { - return false; - } - - @Override - protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { - ReplaySyncReplicationWALStateData.Builder builder = - ReplaySyncReplicationWALStateData.newBuilder().setPeerId(peerId).setWal(wal); - if (targetServer != null) { - builder.setTargetServer(ProtobufUtil.toServerName(targetServer)); - } - serializer.serialize(builder.build()); - } - - @Override - protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { - ReplaySyncReplicationWALStateData data = - serializer.deserialize(ReplaySyncReplicationWALStateData.class); - peerId = data.getPeerId(); - wal = data.getWal(); - if (data.hasTargetServer()) { - targetServer = ProtobufUtil.toServerName(data.getTargetServer()); - } - } - - @Override - public String getPeerId() { - return peerId; - } - - @Override - public PeerOperationType getPeerOperationType() { - return PeerOperationType.REPLAY_SYNC_REPLICATION_WAL; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALManager.java new file mode 100644 index 0000000..377c9f1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALManager.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import static org.apache.hadoop.hbase.replication.ReplicationUtils.REMOTE_WAL_REPLAY_SUFFIX; +import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerRemoteWALDir; +import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir; +import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapshotWALDir; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + +@InterfaceAudience.Private +public class SyncReplicationReplayWALManager { + + private static final Logger LOG = + LoggerFactory.getLogger(SyncReplicationReplayWALManager.class); + + private final MasterServices services; + + private final FileSystem fs; + + private final Path walRootDir; + + private final Path remoteWALDir; + + private final ZKSyncReplicationReplayWALWorkerStorage workerStorage; + + private final Map<String, Set<ServerName>> workers = new HashMap<>(); + + private final Object workerLock = new Object(); + + public SyncReplicationReplayWALManager(MasterServices services) + throws IOException, ReplicationException { + this.services = services; + this.fs = services.getMasterFileSystem().getWALFileSystem(); + this.walRootDir = services.getMasterFileSystem().getWALRootDir(); + this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); + this.workerStorage = new ZKSyncReplicationReplayWALWorkerStorage(services.getZooKeeper(), + services.getConfiguration()); + checkReplayingWALDir(); + } + + private void checkReplayingWALDir() throws IOException, ReplicationException { + FileStatus[] files = fs.listStatus(remoteWALDir); + for (FileStatus file : files) { + String name = file.getPath().getName(); + if (name.endsWith(REMOTE_WAL_REPLAY_SUFFIX)) { + String peerId = name.substring(0, name.length() - REMOTE_WAL_REPLAY_SUFFIX.length()); + workers.put(peerId, workerStorage.getPeerWorkers(peerId)); + } + } + } + + public void registerPeer(String peerId) throws ReplicationException { + workers.put(peerId, new HashSet<>()); + workerStorage.addPeer(peerId); + } + + public void unregisterPeer(String peerId) throws ReplicationException { + workers.remove(peerId); + workerStorage.removePeer(peerId); + } + + public ServerName getPeerWorker(String peerId) throws ReplicationException { + Optional<ServerName> worker = Optional.empty(); + ServerName workerServer = null; + synchronized (workerLock) { + worker = services.getServerManager().getOnlineServers().keySet().stream() + .filter(server -> !workers.get(peerId).contains(server)).findFirst(); + if (worker.isPresent()) { + workerServer = worker.get(); + workers.get(peerId).add(workerServer); + } + } + if (workerServer != null) { + workerStorage.addPeerWorker(peerId, workerServer); + } + return workerServer; + } + + public void removePeerWorker(String peerId, ServerName worker) throws ReplicationException { + synchronized (workerLock) { + workers.get(peerId).remove(worker); + } + workerStorage.removePeerWorker(peerId, worker); + } + public void createPeerRemoteWALDir(String peerId) throws IOException { + Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId); + if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) { + throw new IOException("Unable to mkdir " + peerRemoteWALDir); + } + } + + private void rename(Path src, Path dst, String peerId) throws IOException { + if (fs.exists(src)) { + deleteDir(dst, peerId); + if (!fs.rename(src, dst)) { + throw new IOException( + "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId); + } + LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId); + } else if (!fs.exists(dst)) { + throw new IOException( + "Want to rename from " + src + " to " + dst + ", but they both do not exist"); + } + } + + public void renameToPeerReplayWALDir(String peerId) throws IOException { + rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId), + peerId); + } + + public void renameToPeerSnapshotWALDir(String peerId) throws IOException { + rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId), + peerId); + } + + public List<Path> getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException { + Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId); + for (FileStatus status : fs.listStatus(peerReplayWALDir, + p -> p.getName().endsWith(ReplicationUtils.RENAME_WAL_SUFFIX))) { + Path src = status.getPath(); + String srcName = src.getName(); + String dstName = + srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length()); + FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName)); + } + List<Path> wals = new ArrayList<>(); + for (FileStatus status : fs.listStatus(peerReplayWALDir)) { + Path path = status.getPath(); + if (path.getName().endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) { + wals.add(path); + } else { + if (!fs.delete(path, true)) { + LOG.warn("Can not delete unused file: " + path); + } + } + } + return wals; + } + + public void snapshotPeerReplayWALDir(String peerId) throws IOException { + Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId); + if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) { + throw new IOException( + "Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId); + } + } + + private void deleteDir(Path dir, String peerId) throws IOException { + if (!fs.delete(dir, true) && fs.exists(dir)) { + throw new IOException("Failed to remove dir " + dir + " for peer id=" + peerId); + } + } + + public void removePeerRemoteWALs(String peerId) throws IOException { + deleteDir(getPeerRemoteWALDir(remoteWALDir, peerId), peerId); + deleteDir(getPeerReplayWALDir(remoteWALDir, peerId), peerId); + deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId); + } + + public String removeWALRootPath(Path path) { + String pathStr = path.toString(); + // remove the "/" too. + return pathStr.substring(walRootDir.toString().length() + 1); + } + + public void finishReplayWAL(String wal) throws IOException { + Path walPath = new Path(walRootDir, wal); + fs.truncate(walPath, 0); + } + + public boolean isReplayWALFinished(String wal) throws IOException { + Path walPath = new Path(walRootDir, wal); + return fs.getFileStatus(walPath).getLen() == 0; + } + + @VisibleForTesting + public Path getRemoteWALDir() { + return remoteWALDir; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java new file mode 100644 index 0000000..26d6a3f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALStateData; + +@InterfaceAudience.Private +public class SyncReplicationReplayWALProcedure + extends StateMachineProcedure<MasterProcedureEnv, SyncReplicationReplayWALState> + implements PeerProcedureInterface { + + private static final Logger LOG = + LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class); + + private String peerId; + + private ServerName worker = null; + + private List<String> wals; + + public SyncReplicationReplayWALProcedure() { + } + + public SyncReplicationReplayWALProcedure(String peerId, List<String> wals) { + this.peerId = peerId; + this.wals = wals; + } + + @Override protected Flow executeFromState(MasterProcedureEnv env, + SyncReplicationReplayWALState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + SyncReplicationReplayWALManager syncReplicationReplayWALManager = + env.getMasterServices().getSyncReplicationReplayWALManager(); + switch (state) { + case ASSIGN_WORKER: + try { + worker = syncReplicationReplayWALManager.getPeerWorker(peerId); + } catch (ReplicationException e) { + LOG.info("Failed to get worker to replay wals {} for peer id={}, retry", wals, peerId); + throw new ProcedureYieldException(); + } + if (worker == null) { + LOG.info("No worker to replay wals {} for peer id={}, retry", wals, peerId); + setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); + } else { + setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER); + } + return Flow.HAS_MORE_STATE; + case DISPATCH_WALS_TO_WORKER: + addChildProcedure(new SyncReplicationReplayWALRemoteProcedure(peerId, wals, worker)); + setNextState(SyncReplicationReplayWALState.RELEASE_WORKER); + return Flow.HAS_MORE_STATE; + case RELEASE_WORKER: + boolean finished = false; + try { + finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0)); + } catch (IOException e) { + LOG.info("Failed to check whether replay wals {} finished for peer id={}", wals, peerId); + throw new ProcedureYieldException(); + } + try { + syncReplicationReplayWALManager.removePeerWorker(peerId, worker); + } catch (ReplicationException e) { + LOG.info("Failed to remove worker for peer id={}, retry", peerId); + throw new ProcedureYieldException(); + } + if (!finished) { + LOG.info("Failed to replay wals {} for peer id={}, retry", wals, peerId); + setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); + return Flow.HAS_MORE_STATE; + } + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + @Override + protected void rollbackState(MasterProcedureEnv env, + SyncReplicationReplayWALState state) + throws IOException, InterruptedException { + if (state == getInitialState()) { + return; + } + throw new UnsupportedOperationException(); + } + + @Override + protected SyncReplicationReplayWALState getState(int state) { + return SyncReplicationReplayWALState.forNumber(state); + } + + @Override + protected int getStateId( + SyncReplicationReplayWALState state) { + return state.getNumber(); + } + + @Override + protected SyncReplicationReplayWALState getInitialState() { + return SyncReplicationReplayWALState.ASSIGN_WORKER; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { + SyncReplicationReplayWALStateData.Builder builder = + SyncReplicationReplayWALStateData.newBuilder(); + builder.setPeerId(peerId); + wals.stream().forEach(builder::addWal); + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + SyncReplicationReplayWALStateData data = + serializer.deserialize(SyncReplicationReplayWALStateData.class); + peerId = data.getPeerId(); + wals = new ArrayList<>(); + data.getWalList().forEach(wals::add); + } + + @Override + public String getPeerId() { + return peerId; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java new file mode 100644 index 0000000..9f4f330 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; +import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; +import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALRemoteStateData; + +@InterfaceAudience.Private +public class SyncReplicationReplayWALRemoteProcedure extends Procedure<MasterProcedureEnv> + implements RemoteProcedure<MasterProcedureEnv, ServerName>, PeerProcedureInterface { + + private static final Logger LOG = + LoggerFactory.getLogger(SyncReplicationReplayWALRemoteProcedure.class); + + private String peerId; + + private ServerName targetServer; + + private List<String> wals; + + private boolean dispatched; + + private ProcedureEvent<?> event; + + private boolean succ; + + public SyncReplicationReplayWALRemoteProcedure() { + } + + public SyncReplicationReplayWALRemoteProcedure(String peerId, List<String> wals, + ServerName targetServer) { + this.peerId = peerId; + this.wals = wals; + this.targetServer = targetServer; + } + + @Override + public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) { + ReplaySyncReplicationWALParameter.Builder builder = + ReplaySyncReplicationWALParameter.newBuilder(); + builder.setPeerId(peerId); + wals.stream().forEach(builder::addWal); + return new ServerOperation(this, getProcId(), ReplaySyncReplicationWALCallable.class, + builder.build().toByteArray()); + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) { + complete(env, exception); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + complete(env, error); + } + + private void complete(MasterProcedureEnv env, Throwable error) { + if (event == null) { + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", + getProcId()); + return; + } + if (error != null) { + LOG.warn("Replay wals {} on {} failed for peer id={}", wals, targetServer, peerId, error); + this.succ = false; + } else { + truncateWALs(env); + LOG.info("Replay wals {} on {} succeed for peer id={}", wals, targetServer, peerId); + this.succ = true; + } + event.wake(env.getProcedureScheduler()); + event = null; + } + + /** + * Only truncate wals one by one when task succeed. The parent procedure will check the first + * wal length to know whether this task succeed. + */ + private void truncateWALs(MasterProcedureEnv env) { + String firstWal = wals.get(0); + try { + env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(firstWal); + } catch (IOException e) { + // As it is idempotent to rerun this task. Just ignore this exception and return. + LOG.warn("Failed to truncate wal {} for peer id={}", firstWal, peerId, e); + return; + } + for (int i = 1; i < wals.size(); i++) { + String wal = wals.get(i); + try { + env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(wal); + } catch (IOException e1) { + try { + // retry + env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(wal); + } catch (IOException e2) { + // As the parent procedure only check the first wal length. Just ignore this exception. + LOG.warn("Failed to truncate wal {} for peer id={}", wal, peerId, e2); + } + } + } + } + + @Override + protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + // retry + dispatched = false; + } + + // Dispatch task to target server + try { + env.getRemoteDispatcher().addOperationToNode(targetServer, this); + } catch (FailedRemoteDispatchException e) { + LOG.warn( + "Can not add remote operation for replay wals {} on {} for peer id={}, " + + "this usually because the server is already dead, retry", + wals, targetServer, peerId); + throw new ProcedureYieldException(); + } + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { + SyncReplicationReplayWALRemoteStateData.Builder builder = + SyncReplicationReplayWALRemoteStateData.newBuilder().setPeerId(peerId) + .setTargetServer(ProtobufUtil.toServerName(targetServer)); + wals.stream().forEach(builder::addWal); + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + SyncReplicationReplayWALRemoteStateData data = + serializer.deserialize(SyncReplicationReplayWALRemoteStateData.class); + peerId = data.getPeerId(); + wals = new ArrayList<>(); + data.getWalList().forEach(wals::add); + targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + } + + @Override + public String getPeerId() { + return peerId; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL_REMOTE; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index 66f67dd..c650974 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -186,8 +186,8 @@ public class TransitPeerSyncReplicationStateProcedure } } - private void replayRemoteWAL() { - addChildProcedure(new RecoverStandbyProcedure(peerId)); + private void replayRemoteWAL(boolean serial) { + addChildProcedure(new RecoverStandbyProcedure(peerId, serial)); } @Override @@ -232,7 +232,7 @@ public class TransitPeerSyncReplicationStateProcedure setNextStateAfterRefreshBegin(); return Flow.HAS_MORE_STATE; case REPLAY_REMOTE_WAL_IN_PEER: - replayRemoteWAL(); + replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial()); setNextState( PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); return Flow.HAS_MORE_STATE; http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java new file mode 100644 index 0000000..5991cf0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; + +@InterfaceAudience.Private +public class ZKSyncReplicationReplayWALWorkerStorage extends ZKReplicationStorageBase { + + public static final String WORKERS_ZNODE = "zookeeper.znode.sync.replication.replaywal.workers"; + + public static final String WORKERS_ZNODE_DEFAULT = "replaywal-workers"; + + /** + * The name of the znode that contains a list of workers to replay wal. + */ + private final String workersZNode; + + public ZKSyncReplicationReplayWALWorkerStorage(ZKWatcher zookeeper, Configuration conf) { + super(zookeeper, conf); + String workersZNodeName = conf.get(WORKERS_ZNODE, WORKERS_ZNODE_DEFAULT); + workersZNode = ZNodePaths.joinZNode(replicationZNode, workersZNodeName); + } + + private String getPeerNode(String peerId) { + return ZNodePaths.joinZNode(workersZNode, peerId); + } + + public void addPeer(String peerId) throws ReplicationException { + try { + ZKUtil.createWithParents(zookeeper, getPeerNode(peerId)); + } catch (KeeperException e) { + throw new ReplicationException( + "Failed to add peer id=" + peerId + " to replaywal-workers storage", e); + } + } + + public void removePeer(String peerId) throws ReplicationException { + try { + ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId)); + } catch (KeeperException e) { + throw new ReplicationException( + "Failed to remove peer id=" + peerId + " to replaywal-workers storage", e); + } + } + + private String getPeerWorkerNode(String peerId, ServerName worker) { + return ZNodePaths.joinZNode(getPeerNode(peerId), worker.getServerName()); + } + + public void addPeerWorker(String peerId, ServerName worker) throws ReplicationException { + try { + ZKUtil.createWithParents(zookeeper, getPeerWorkerNode(peerId, worker)); + } catch (KeeperException e) { + throw new ReplicationException("Failed to add worker=" + worker + " for peer id=" + peerId, + e); + } + } + + public void removePeerWorker(String peerId, ServerName worker) throws ReplicationException { + try { + ZKUtil.deleteNode(zookeeper, getPeerWorkerNode(peerId, worker)); + } catch (KeeperException e) { + throw new ReplicationException("Failed to remove worker=" + worker + " for peer id=" + peerId, + e); + } + } + + public Set<ServerName> getPeerWorkers(String peerId) throws ReplicationException { + try { + List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getPeerNode(peerId)); + if (children == null) { + return new HashSet<>(); + } + return children.stream().map(ServerName::valueOf).collect(Collectors.toSet()); + } catch (KeeperException e) { + throw new ReplicationException("Failed to list workers for peer id=" + peerId, e); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java index 3cf065c..24963f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java @@ -21,6 +21,8 @@ import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.locks.Lock; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,6 +34,7 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; @@ -68,31 +71,28 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { private String peerId; - private String wal; + private List<String> wals = new ArrayList<>(); private Exception initError; private long batchSize; + private final KeyLocker<String> peersLock = new KeyLocker<>(); + @Override public Void call() throws Exception { if (initError != null) { throw initError; } - LOG.info("Received a replay sync replication wal {} event, peerId={}", wal, peerId); + LOG.info("Received a replay sync replication wals {} event, peerId={}", wals, peerId); if (rs.getReplicationSinkService() != null) { - try (Reader reader = getReader()) { - List<Entry> entries = readWALEntries(reader); - while (!entries.isEmpty()) { - Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtbufUtil - .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()])); - ReplicateWALEntryRequest request = pair.getFirst(); - rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(), - pair.getSecond(), request.getReplicationClusterId(), - request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath()); - // Read next entries. - entries = readWALEntries(reader); + Lock peerLock = peersLock.acquireLock(wals.get(0)); + try { + for (String wal : wals) { + replayWAL(wal); } + } finally { + peerLock.unlock(); } } return null; @@ -107,7 +107,7 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { ReplaySyncReplicationWALParameter param = ReplaySyncReplicationWALParameter.parseFrom(parameter); this.peerId = param.getPeerId(); - this.wal = param.getWal(); + param.getWalList().forEach(this.wals::add); this.batchSize = rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE, DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE); } catch (InvalidProtocolBufferException e) { @@ -120,7 +120,23 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { return EventType.RS_REPLAY_SYNC_REPLICATION_WAL; } - private Reader getReader() throws IOException { + private void replayWAL(String wal) throws IOException { + try (Reader reader = getReader(wal)) { + List<Entry> entries = readWALEntries(reader); + while (!entries.isEmpty()) { + Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtbufUtil + .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()])); + ReplicateWALEntryRequest request = pair.getFirst(); + rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(), + pair.getSecond(), request.getReplicationClusterId(), + request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath()); + // Read next entries. + entries = readWALEntries(reader); + } + } + } + + private Reader getReader(String wal) throws IOException { Path path = new Path(rs.getWALRootDir(), wal); long length = rs.getWALFileSystem().getFileStatus(path).getLen(); try { http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 48d47ea..ac20dbd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -41,8 +41,8 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -476,7 +476,7 @@ public class MockNoopMasterServices implements MasterServices { } @Override - public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() { + public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() { return null; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index a20edd3..f765139 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -78,7 +78,7 @@ public class SyncReplicationTestBase { protected static Path REMOTE_WAL_DIR2; - private static void initTestingUtility(HBaseTestingUtility util, String zkParent) { + protected static void initTestingUtility(HBaseTestingUtility util, String zkParent) { util.setZkCluster(ZK_UTIL.getZkCluster()); Configuration conf = util.getConfiguration(); conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent); @@ -102,8 +102,8 @@ public class SyncReplicationTestBase { ZK_UTIL.startMiniZKCluster(); initTestingUtility(UTIL1, "/cluster1"); initTestingUtility(UTIL2, "/cluster2"); - UTIL1.startMiniCluster(3); - UTIL2.startMiniCluster(3); + UTIL1.startMiniCluster(2,3); + UTIL2.startMiniCluster(2,3); TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java new file mode 100644 index 0000000..6265f5c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationStandbyKillMaster extends SyncReplicationTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(TestSyncReplicationStandbyKillMaster.class); + + private final long SLEEP_TIME = 2000; + + private final int COUNT = 1000; + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillMaster.class); + + @Test + public void testStandbyKillMaster() throws Exception { + MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem(); + Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID); + assertFalse(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + assertTrue(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + + // Disable async replication and write data, then shutdown + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + write(UTIL1, 0, COUNT); + UTIL1.shutdownMiniCluster(); + + Thread t = new Thread(() -> { + try { + Thread.sleep(SLEEP_TIME); + UTIL2.getMiniHBaseCluster().getMaster().stop("Stop master for test"); + } catch (Exception e) { + LOG.error("Failed to stop master", e); + } + }); + t.start(); + + // Transit standby to DA to replay logs + try { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + } catch (Exception e) { + LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE); + } + + while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID) + != SyncReplicationState.DOWNGRADE_ACTIVE) { + Thread.sleep(SLEEP_TIME); + } + verify(UTIL2, 0, COUNT); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java new file mode 100644 index 0000000..3c9724f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationStandbyKillRS extends SyncReplicationTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(TestSyncReplicationStandbyKillRS.class); + + private final long SLEEP_TIME = 1000; + + private final int COUNT = 1000; + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillRS.class); + + @Test + public void testStandbyKillRegionServer() throws Exception { + MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem(); + Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID); + assertFalse(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + assertTrue(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + + // Disable async replication and write data, then shutdown + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + write(UTIL1, 0, COUNT); + UTIL1.shutdownMiniCluster(); + + JVMClusterUtil.MasterThread activeMaster = UTIL2.getMiniHBaseCluster().getMasterThread(); + Thread t = new Thread(() -> { + try { + List<JVMClusterUtil.RegionServerThread> regionServers = + UTIL2.getMiniHBaseCluster().getLiveRegionServerThreads(); + for (JVMClusterUtil.RegionServerThread rst : regionServers) { + ServerName serverName = rst.getRegionServer().getServerName(); + rst.getRegionServer().stop("Stop RS for test"); + waitForRSShutdownToStartAndFinish(activeMaster, serverName); + JVMClusterUtil.RegionServerThread restarted = + UTIL2.getMiniHBaseCluster().startRegionServer(); + restarted.waitForServerOnline(); + } + } catch (Exception e) { + LOG.error("Failed to kill RS", e); + } + }); + t.start(); + + // Transit standby to DA to replay logs + try { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + } catch (Exception e) { + LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE); + } + + while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID) + != SyncReplicationState.DOWNGRADE_ACTIVE) { + Thread.sleep(SLEEP_TIME); + } + verify(UTIL2, 0, COUNT); + } + + private void waitForRSShutdownToStartAndFinish(JVMClusterUtil.MasterThread activeMaster, + ServerName serverName) throws InterruptedException { + ServerManager sm = activeMaster.getMaster().getServerManager(); + // First wait for it to be in dead list + while (!sm.getDeadServers().isDeadServer(serverName)) { + LOG.debug("Waiting for [" + serverName + "] to be listed as dead in master"); + Thread.sleep(SLEEP_TIME); + } + LOG.debug("Server [" + serverName + "] marked as dead, waiting for it to " + + "finish dead processing"); + while (sm.areDeadServersInProgress()) { + LOG.debug("Server [" + serverName + "] still being processed, waiting"); + Thread.sleep(SLEEP_TIME); + } + LOG.debug("Server [" + serverName + "] done with server shutdown processing"); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java index 2563669..d01a0ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure; -import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager; +import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; @@ -92,7 +92,7 @@ public class TestRecoverStandbyProcedure { private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static ReplaySyncReplicationWALManager replaySyncReplicationWALManager; + private static SyncReplicationReplayWALManager syncReplicationReplayWALManager; private static ProcedureExecutor<MasterProcedureEnv> procExec; @@ -107,7 +107,7 @@ public class TestRecoverStandbyProcedure { conf = UTIL.getConfiguration(); HMaster master = UTIL.getHBaseCluster().getMaster(); fs = master.getMasterFileSystem().getWALFileSystem(); - replaySyncReplicationWALManager = master.getReplaySyncReplicationWALManager(); + syncReplicationReplayWALManager = master.getSyncReplicationReplayWALManager(); procExec = master.getMasterProcedureExecutor(); } @@ -138,7 +138,7 @@ public class TestRecoverStandbyProcedure { @Test public void testRecoverStandby() throws IOException, StreamLacksCapabilityException { setupSyncReplicationWALs(); - long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID)); + long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID, false)); ProcedureTestingUtility.waitProcedure(procExec, procId); ProcedureTestingUtility.assertProcNotFailed(procExec, procId); @@ -153,7 +153,7 @@ public class TestRecoverStandbyProcedure { private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException { Path peerRemoteWALDir = ReplicationUtils - .getPeerRemoteWALDir(replaySyncReplicationWALManager.getRemoteWALDir(), PEER_ID); + .getPeerRemoteWALDir(syncReplicationReplayWALManager.getRemoteWALDir(), PEER_ID); if (!fs.exists(peerRemoteWALDir)) { fs.mkdirs(peerRemoteWALDir); }