HBASE-20426 Give up replicating anything in S state
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9174b3ff Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9174b3ff Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9174b3ff Branch: refs/heads/HBASE-19064 Commit: 9174b3ff40f5bcea199d1306285e4f210fee929c Parents: e29e77e Author: zhangduo <zhang...@apache.org> Authored: Thu May 3 15:51:35 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri May 25 10:11:48 2018 +0800 ---------------------------------------------------------------------- .../src/main/protobuf/MasterProcedure.proto | 13 +- .../replication/AbstractPeerProcedure.java | 4 + .../master/replication/ModifyPeerProcedure.java | 6 - .../replication/ReplicationPeerManager.java | 13 +- ...ransitPeerSyncReplicationStateProcedure.java | 94 +++++++++++---- .../hadoop/hbase/regionserver/LogRoller.java | 11 +- .../regionserver/PeerProcedureHandlerImpl.java | 63 ++++++++-- .../regionserver/ReplicationSource.java | 1 + .../regionserver/ReplicationSourceManager.java | 118 ++++++++++++++++--- .../TestDrainReplicationQueuesForStandBy.java | 118 +++++++++++++++++++ 10 files changed, 379 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9174b3ff/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 01e4dae..f15cb04 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -394,11 +394,14 @@ enum PeerSyncReplicationStateTransitionState { SET_PEER_NEW_SYNC_REPLICATION_STATE = 2; REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3; REPLAY_REMOTE_WAL_IN_PEER = 4; - REOPEN_ALL_REGIONS_IN_PEER = 5; - TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 6; - REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 7; - CREATE_DIR_FOR_REMOTE_WAL = 8; - POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 9; + REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 5; + REOPEN_ALL_REGIONS_IN_PEER = 6; + TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 7; + REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 8; + SYNC_REPLICATION_SET_PEER_ENABLED = 9; + SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS = 10; + CREATE_DIR_FOR_REMOTE_WAL = 11; + POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 12; } message PeerModificationStateData { http://git-wip-us.apache.org/repos/asf/hbase/blob/9174b3ff/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java index 6679d78..458e073 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java @@ -106,4 +106,8 @@ public abstract class AbstractPeerProcedure<TState> throw new UnsupportedOperationException(); } + protected final void refreshPeer(MasterProcedureEnv env, PeerOperationType type) { + addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() + .map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new)); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/9174b3ff/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 32b8ea1..56462ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -108,12 +108,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi throw new UnsupportedOperationException(); } - private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) { - addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() - .map(sn -> new RefreshPeerProcedure(peerId, type, sn)) - .toArray(RefreshPeerProcedure[]::new)); - } - protected ReplicationPeerConfig getOldPeerConfig() { return null; } http://git-wip-us.apache.org/repos/asf/hbase/blob/9174b3ff/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 229549e..e1d8b51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -192,9 +193,9 @@ public class ReplicationPeerManager { } /** - * @return the old state. + * @return the old state, and whether the peer is enabled. */ - public SyncReplicationState preTransitPeerSyncReplicationState(String peerId, + Pair<SyncReplicationState, Boolean> preTransitPeerSyncReplicationState(String peerId, SyncReplicationState state) throws DoNotRetryIOException { ReplicationPeerDescription desc = checkPeerExists(peerId); SyncReplicationState fromState = desc.getSyncReplicationState(); @@ -203,7 +204,7 @@ public class ReplicationPeerManager { throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState + " to " + state + " for peer id=" + peerId); } - return fromState; + return Pair.newPair(fromState, desc.isEnabled()); } public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) @@ -303,7 +304,7 @@ public class ReplicationPeerManager { } } - public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { + public void removeAllQueues(String peerId) throws ReplicationException { // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still // on-going when the refresh peer config procedure is done, if a RS which has already been // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in @@ -317,6 +318,10 @@ public class ReplicationPeerManager { // unless it has already been removed by others. ReplicationUtils.removeAllQueues(queueStorage, peerId); ReplicationUtils.removeAllQueues(queueStorage, peerId); + } + + public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { + removeAllQueues(peerId); queueStorage.removePeerFromHFileRefs(peerId); } http://git-wip-us.apache.org/repos/asf/hbase/blob/9174b3ff/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index 99fd615..0175296 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +55,8 @@ public class TransitPeerSyncReplicationStateProcedure private SyncReplicationState toState; + private boolean enabled; + public TransitPeerSyncReplicationStateProcedure() { } @@ -110,7 +113,10 @@ public class TransitPeerSyncReplicationStateProcedure if (cpHost != null) { cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState); } - fromState = env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState); + Pair<SyncReplicationState, Boolean> pair = + env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState); + fromState = pair.getFirst(); + enabled = pair.getSecond(); } private void postTransit(MasterProcedureEnv env) throws IOException { @@ -131,6 +137,21 @@ public class TransitPeerSyncReplicationStateProcedure .collect(Collectors.toList()); } + private void createDirForRemoteWAL(MasterProcedureEnv env) + throws ProcedureYieldException, IOException { + MasterFileSystem mfs = env.getMasterFileSystem(); + Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); + Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId); + FileSystem walFs = mfs.getWALFileSystem(); + if (walFs.exists(remoteWALDirForPeer)) { + LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway", + remoteWALDirForPeer); + } else if (!walFs.mkdirs(remoteWALDirForPeer)) { + LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer); + throw new ProcedureYieldException(); + } + } + @Override protected Flow executeFromState(MasterProcedureEnv env, PeerSyncReplicationStateTransitionState state) @@ -151,6 +172,13 @@ public class TransitPeerSyncReplicationStateProcedure case SET_PEER_NEW_SYNC_REPLICATION_STATE: try { env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState); + if (toState.equals(SyncReplicationState.STANDBY) && enabled) { + // disable the peer if we are going to transit to STANDBY state, as we need to remove + // all the pending replication files. If we do not disable the peer and delete the wal + // queues on zk directly, RS will get NoNode exception when updating the wal position + // and crash. + env.getReplicationPeerManager().disablePeer(peerId); + } } catch (ReplicationException e) { LOG.warn("Failed to update peer storage for peer {} when starting transiting sync " + "replication peer state from {} to {}, retry", peerId, fromState, toState, e); @@ -163,16 +191,35 @@ public class TransitPeerSyncReplicationStateProcedure addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0)) .toArray(RefreshPeerProcedure[]::new)); - if (fromState == SyncReplicationState.STANDBY && - toState == SyncReplicationState.DOWNGRADE_ACTIVE) { - setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); + if (fromState.equals(SyncReplicationState.ACTIVE)) { + setNextState(toState.equals(SyncReplicationState.STANDBY) + ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER + : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); + } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) { + setNextState(toState.equals(SyncReplicationState.STANDBY) + ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER + : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); } else { - setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); + assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE); + setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); } return Flow.HAS_MORE_STATE; case REPLAY_REMOTE_WAL_IN_PEER: addChildProcedure(new RecoverStandbyProcedure(peerId)); - setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); + setNextState( + PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); + return Flow.HAS_MORE_STATE; + case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER: + try { + env.getReplicationPeerManager().removeAllQueues(peerId); + } catch (ReplicationException e) { + LOG.warn("Failed to remove all replication queues peer {} when starting transiting" + + " sync replication peer state from {} to {}, retry", peerId, fromState, toState, e); + throw new ProcedureYieldException(); + } + setNextState(fromState.equals(SyncReplicationState.ACTIVE) + ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER + : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); return Flow.HAS_MORE_STATE; case REOPEN_ALL_REGIONS_IN_PEER: try { @@ -202,27 +249,35 @@ public class TransitPeerSyncReplicationStateProcedure .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1)) .toArray(RefreshPeerProcedure[]::new)); if (toState == SyncReplicationState.STANDBY) { - setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL); + setNextState( + enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED + : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL); } else { setNextState( PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); } return Flow.HAS_MORE_STATE; + case SYNC_REPLICATION_SET_PEER_ENABLED: + try { + env.getReplicationPeerManager().enablePeer(peerId); + } catch (ReplicationException e) { + LOG.warn("Failed to set peer enabled for peer {} when transiting sync replication peer " + + "state from {} to {}, retry", peerId, fromState, toState, e); + throw new ProcedureYieldException(); + } + setNextState( + PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS); + return Flow.HAS_MORE_STATE; + case SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS: + refreshPeer(env, PeerOperationType.ENABLE); + setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL); + return Flow.HAS_MORE_STATE; case CREATE_DIR_FOR_REMOTE_WAL: - MasterFileSystem mfs = env.getMasterFileSystem(); - Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); - Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId); - FileSystem walFs = mfs.getWALFileSystem(); try { - if (walFs.exists(remoteWALDirForPeer)) { - LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway", - remoteWALDirForPeer); - } else if (!walFs.mkdirs(remoteWALDirForPeer)) { - LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer); - throw new ProcedureYieldException(); - } + createDirForRemoteWAL(env); } catch (IOException e) { - LOG.warn("Failed to create remote wal dir {}", remoteWALDirForPeer, e); + LOG.warn("Failed to create remote wal dir for peer {} when transiting sync replication " + + "peer state from {} to {}, retry", peerId, fromState, toState, e); throw new ProcedureYieldException(); } setNextState( @@ -242,5 +297,4 @@ public class TransitPeerSyncReplicationStateProcedure throw new UnsupportedOperationException("unhandled state=" + state); } } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/9174b3ff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index ab0083f..05a8fdf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -244,10 +244,8 @@ public class LogRoller extends HasThread implements Closeable { } /** - * For testing only * @return true if all WAL roll finished */ - @VisibleForTesting public boolean walRollFinished() { for (boolean needRoll : walNeedsRoll.values()) { if (needRoll) { @@ -257,6 +255,15 @@ public class LogRoller extends HasThread implements Closeable { return true; } + /** + * Wait until all wals have been rolled after calling {@link #requestRollAll()}. + */ + public void waitUntilWalRollFinished() throws InterruptedException { + while (!walRollFinished()) { + Thread.sleep(100); + } + } + @Override public void close() { running = false; http://git-wip-us.apache.org/repos/asf/hbase/blob/9174b3ff/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index 7fc9f53..d01b130 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.concurrent.locks.Lock; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.LogRoller; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -154,24 +156,65 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { if (!peer.getPeerConfig().isSyncReplication()) { throw new ReplicationException("Peer with id=" + peerId + " is not synchronous."); } - SyncReplicationState newState = peer.getNewSyncReplicationState(); + SyncReplicationState newSyncReplicationState = peer.getNewSyncReplicationState(); if (stage == 0) { - if (newState != SyncReplicationState.NONE) { + if (newSyncReplicationState != SyncReplicationState.NONE) { LOG.warn("The new sync replication state for peer {} has already been set to {}, " + - "this should be a retry, give up", peerId, newState); + "this should be a retry, give up", peerId, newSyncReplicationState); return; } - newState = replicationPeers.refreshPeerNewSyncReplicationState(peerId); - SyncReplicationState oldState = peer.getSyncReplicationState(); - peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage); + // refresh the peer state first, as when we transit to STANDBY, we may need to disable the + // peer before processing the sync replication state. + PeerState oldState = peer.getPeerState(); + boolean success = false; + try { + PeerState newState = replicationPeers.refreshPeerState(peerId); + if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) { + replicationSourceManager.refreshSources(peerId); + } + success = true; + } finally { + if (!success) { + peer.setPeerState(oldState.equals(PeerState.ENABLED)); + } + } + newSyncReplicationState = replicationPeers.refreshPeerNewSyncReplicationState(peerId); + SyncReplicationState oldSyncReplicationState = peer.getSyncReplicationState(); + peerActionListener.peerSyncReplicationStateChange(peerId, oldSyncReplicationState, + newSyncReplicationState, stage); } else { - if (newState == SyncReplicationState.NONE) { - LOG.warn("The new sync replication state for peer {} has already been clear, and the " + - "current state is {}, this should be a retry, give up", peerId, newState); + if (newSyncReplicationState == SyncReplicationState.NONE) { + LOG.warn( + "The new sync replication state for peer {} has already been clear, and the " + + "current state is {}, this should be a retry, give up", + peerId, newSyncReplicationState); return; } + if (newSyncReplicationState == SyncReplicationState.STANDBY) { + replicationSourceManager.drainSources(peerId); + // Need to roll the wals and make the ReplicationSource for this peer track the new file. + // If we do not do this, there will be two problems that can not be addressed at the same + // time. First, if we just throw away the current wal file, and later when we transit the + // peer to DA, and the wal has not been rolled yet, then the new data written to the wal + // file will not be replicated and cause data inconsistency. But if we just track the + // current wal file without rolling, it may contains some data before we transit the peer + // to S, later if we transit the peer to DA, the data will also be replicated and cause + // data inconsistency. So here we need to roll the wal, and let the ReplicationSource + // track the new wal file, and throw the old wal files away. + LogRoller roller = rs.getWalRoller(); + roller.requestRollAll(); + try { + roller.waitUntilWalRollFinished(); + } catch (InterruptedException e) { + // reset the interrupted flag + Thread.currentThread().interrupt(); + throw (IOException) new InterruptedIOException( + "Interrupted while waiting for wal roll finish").initCause(e); + } + } SyncReplicationState oldState = peer.getSyncReplicationState(); - peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage); + peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newSyncReplicationState, + stage); peer.transitSyncReplicationState(); } } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/9174b3ff/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index c669622..1cac0c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -500,6 +500,7 @@ public class ReplicationSource implements ReplicationSourceInterface { if (this.replicationEndpoint != null) { this.replicationEndpoint.stop(); } + metrics.clear(); if (join) { for (ReplicationSourceShipper worker : workers) { Threads.shutdown(worker, this.sleepForRetries); http://git-wip-us.apache.org/repos/asf/hbase/blob/9174b3ff/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 5015129..f25b073 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; @@ -391,11 +392,83 @@ public class ReplicationSourceManager implements ReplicationListener { } /** + * <p> + * This is used when we transit a sync replication peer to {@link SyncReplicationState#STANDBY}. + * </p> + * <p> + * When transiting to {@link SyncReplicationState#STANDBY}, we can remove all the pending wal + * files for a replication peer as we do not need to replicate them any more. And this is + * necessary, otherwise when we transit back to {@link SyncReplicationState#DOWNGRADE_ACTIVE} + * later, the stale data will be replicated again and cause inconsistency. + * </p> + * <p> + * See HBASE-20426 for more details. + * </p> + * @param peerId the id of the sync replication peer + */ + public void drainSources(String peerId) throws IOException, ReplicationException { + String terminateMessage = "Sync replication peer " + peerId + + " is transiting to STANDBY. Will close the previous replication source and open a new one"; + ReplicationPeer peer = replicationPeers.getPeer(peerId); + assert peer.getPeerConfig().isSyncReplication(); + ReplicationSourceInterface src = createSource(peerId, peer); + // synchronized here to avoid race with preLogRoll where we add new log to source and also + // walsById. + ReplicationSourceInterface toRemove; + Map<String, NavigableSet<String>> wals = new HashMap<>(); + synchronized (latestPaths) { + toRemove = sources.put(peerId, src); + if (toRemove != null) { + LOG.info("Terminate replication source for " + toRemove.getPeerId()); + toRemove.terminate(terminateMessage); + toRemove.getSourceMetrics().clear(); + } + // Here we make a copy of all the remaining wal files and then delete them from the + // replication queue storage after releasing the lock. It is not safe to just remove the old + // map from walsById since later we may fail to delete them from the replication queue + // storage, and when we retry next time, we can not know the wal files that need to be deleted + // from the replication queue storage. + walsById.get(peerId).forEach((k, v) -> wals.put(k, new TreeSet<>(v))); + } + LOG.info("Startup replication source for " + src.getPeerId()); + src.startup(); + for (NavigableSet<String> walsByGroup : wals.values()) { + for (String wal : walsByGroup) { + queueStorage.removeWAL(server.getServerName(), peerId, wal); + } + } + synchronized (walsById) { + Map<String, NavigableSet<String>> oldWals = walsById.get(peerId); + wals.forEach((k, v) -> { + NavigableSet<String> walsByGroup = oldWals.get(k); + if (walsByGroup != null) { + walsByGroup.removeAll(v); + } + }); + } + // synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is + // a background task, we will delete the file from replication queue storage under the lock to + // simplify the logic. + synchronized (this.oldsources) { + for (Iterator<ReplicationSourceInterface> iter = oldsources.iterator(); iter.hasNext();) { + ReplicationSourceInterface oldSource = iter.next(); + if (oldSource.getPeerId().equals(peerId)) { + String queueId = oldSource.getQueueId(); + oldSource.terminate(terminateMessage); + oldSource.getSourceMetrics().clear(); + queueStorage.removeQueue(server.getServerName(), queueId); + walsByIdRecoveredQueues.remove(queueId); + iter.remove(); + } + } + } + } + + /** * Close the previous replication sources of this peer id and open new sources to trigger the new * replication state changes or new replication config changes. Here we don't need to change * replication queue storage and only to enqueue all logs to the new replication source * @param peerId the id of the replication peer - * @throws IOException */ public void refreshSources(String peerId) throws IOException { String terminateMessage = "Peer " + peerId + @@ -409,7 +482,7 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.info("Terminate replication source for " + toRemove.getPeerId()); toRemove.terminate(terminateMessage); } - for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) { + for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) { walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); } } @@ -830,18 +903,6 @@ public class ReplicationSourceManager implements ReplicationListener { actualPeerId); continue; } - // track sources in walsByIdRecoveredQueues - Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); - walsByIdRecoveredQueues.put(queueId, walsByGroup); - for (String wal : walsSet) { - String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); - NavigableSet<String> wals = walsByGroup.get(walPrefix); - if (wals == null) { - wals = new TreeSet<>(); - walsByGroup.put(walPrefix, wals); - } - wals.add(wal); - } ReplicationSourceInterface src = createSource(queueId, peer); // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer @@ -849,9 +910,36 @@ public class ReplicationSourceManager implements ReplicationListener { peer = replicationPeers.getPeer(src.getPeerId()); if (peer == null || !isOldPeer(src.getPeerId(), peer)) { src.terminate("Recovered queue doesn't belong to any current peer"); - removeRecoveredSource(src); + deleteQueue(queueId); continue; } + // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is + // transiting to STANDBY state. The only exception is we are in STANDBY state and + // transiting to DA, under this state we will replay the remote WAL and they need to be + // replicated back. + if (peer.getPeerConfig().isSyncReplication()) { + Pair<SyncReplicationState, SyncReplicationState> stateAndNewState = + peer.getSyncReplicationStateAndNewState(); + if ((stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY) && + stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) || + stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) { + src.terminate("Sync replication peer is in STANDBY state"); + deleteQueue(queueId); + continue; + } + } + // track sources in walsByIdRecoveredQueues + Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); + walsByIdRecoveredQueues.put(queueId, walsByGroup); + for (String wal : walsSet) { + String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); + NavigableSet<String> wals = walsByGroup.get(walPrefix); + if (wals == null) { + wals = new TreeSet<>(); + walsByGroup.put(walPrefix, wals); + } + wals.add(wal); + } oldsources.add(src); for (String wal : walsSet) { src.enqueueLog(new Path(oldLogDir, wal)); http://git-wip-us.apache.org/repos/asf/hbase/blob/9174b3ff/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java new file mode 100644 index 0000000..5da7870 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.SyncReplicationTestBase; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestDrainReplicationQueuesForStandBy.class); + + @Test + public void test() throws Exception { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + write(UTIL1, 0, 100); + + HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME); + String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName( + ((AbstractFSWAL<?>) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build())) + .getCurrentFileName().getName()); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + // transit cluster2 to DA and cluster 1 to S + verify(UTIL2, 0, 100); + + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + // delete the original value, and then major compact + try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) { + for (int i = 0; i < 100; i++) { + table.delete(new Delete(Bytes.toBytes(i))); + } + } + UTIL2.flush(TABLE_NAME); + UTIL2.compact(TABLE_NAME, true); + // wait until the new values are replicated back to cluster1 + HRegion region = rs.getRegions(TABLE_NAME).get(0); + UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() { + + @Override + public boolean evaluate() throws Exception { + return region.get(new Get(Bytes.toBytes(99))).isEmpty(); + } + + @Override + public String explainFailure() throws Exception { + return "Replication has not been catched up yet"; + } + }); + // transit cluster1 to DA and cluster2 to S, then we will start replicating from cluster1 to + // cluster2 + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL1.getAdmin().enableReplicationPeer(PEER_ID); + + // confirm that we will not replicate the old data which causes inconsistency + ReplicationSource source = (ReplicationSource) ((Replication) rs.getReplicationSourceService()) + .getReplicationManager().getSource(PEER_ID); + UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() { + + @Override + public boolean evaluate() throws Exception { + return !source.workerThreads.containsKey(walGroupId); + } + + @Override + public String explainFailure() throws Exception { + return "Replication has not been catched up yet"; + } + }); + HRegion region2 = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); + for (int i = 0; i < 100; i++) { + assertTrue(region2.get(new Get(Bytes.toBytes(i))).isEmpty()); + } + } +} \ No newline at end of file