HBASE-20424 Allow writing WAL to local and remote cluster concurrently
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/599080cc Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/599080cc Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/599080cc Branch: refs/heads/HBASE-19064 Commit: 599080ccd46ec8a09859bef95c5b096b708fa238 Parents: fbddf63 Author: zhangduo <zhang...@apache.org> Authored: Thu May 24 16:20:28 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Tue Jun 5 18:13:59 2018 +0800 ---------------------------------------------------------------------- .../src/main/protobuf/MasterProcedure.proto | 2 +- .../hbase/replication/ReplicationUtils.java | 26 ++- .../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 3 +- .../replication/RecoverStandbyProcedure.java | 10 +- .../master/replication/RemovePeerProcedure.java | 5 +- .../ReplaySyncReplicationWALManager.java | 110 ++++++----- ...ransitPeerSyncReplicationStateProcedure.java | 4 +- .../hbase/regionserver/HRegionServer.java | 3 +- .../regionserver/ReplicationSourceService.java | 6 + .../hbase/regionserver/SplitLogWorker.java | 188 +++++++++++++------ .../regionserver/wal/CombinedAsyncWriter.java | 80 ++------ .../hbase/regionserver/wal/DualAsyncFSWAL.java | 11 +- .../replication/regionserver/Replication.java | 5 + .../regionserver/ReplicationSourceManager.java | 2 +- .../SyncReplicationPeerInfoProviderImpl.java | 3 +- .../org/apache/hadoop/hbase/util/FSUtils.java | 9 + .../hbase/wal/SyncReplicationWALProvider.java | 43 ++++- .../replication/TestReplicationAdmin.java | 2 +- .../wal/TestCombinedAsyncWriter.java | 20 +- .../replication/DualAsyncFSWALForTest.java | 149 +++++++++++++++ .../replication/SyncReplicationTestBase.java | 12 +- .../replication/TestSyncReplicationActive.java | 5 +- ...cReplicationMoreLogsInLocalCopyToRemote.java | 108 +++++++++++ ...plicationMoreLogsInLocalGiveUpSplitting.java | 128 +++++++++++++ .../TestSyncReplicationRemoveRemoteWAL.java | 7 +- .../replication/TestSyncReplicationStandBy.java | 20 +- .../master/TestRecoverStandbyProcedure.java | 4 +- .../TestReplicationSourceManager.java | 5 +- .../wal/TestSyncReplicationWALProvider.java | 1 - 29 files changed, 733 insertions(+), 238 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index f58ad2e..5764a21 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -476,7 +476,7 @@ enum RecoverStandbyState { RENAME_SYNC_REPLICATION_WALS_DIR = 1; INIT_WORKERS = 2; DISPATCH_TASKS = 3; - REMOVE_SYNC_REPLICATION_WALS_DIR = 4; + SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 4; } message RecoverStandbyStateData { http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index 069db7a..dc4217c 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -46,6 +46,16 @@ public final class ReplicationUtils { public static final String REMOTE_WAL_DIR_NAME = "remoteWALs"; + public static final String SYNC_WAL_SUFFIX = ".syncrep"; + + public static final String REMOTE_WAL_REPLAY_SUFFIX = "-replay"; + + public static final String REMOTE_WAL_SNAPSHOT_SUFFIX = "-snapshot"; + + // This is used for copying sync replication log from local to remote and overwrite the old one + // since some FileSystem implementation may not support atomic rename. + public static final String RENAME_WAL_SUFFIX = ".ren"; + private ReplicationUtils() { } @@ -187,14 +197,26 @@ public final class ReplicationUtils { return new Path(remoteWALDir).getFileSystem(conf); } - public static Path getRemoteWALDirForPeer(String remoteWALDir, String peerId) { + public static Path getPeerRemoteWALDir(String remoteWALDir, String peerId) { return new Path(remoteWALDir, peerId); } - public static Path getRemoteWALDirForPeer(Path remoteWALDir, String peerId) { + public static Path getPeerRemoteWALDir(Path remoteWALDir, String peerId) { return new Path(remoteWALDir, peerId); } + public static Path getPeerReplayWALDir(Path remoteWALDir, String peerId) { + return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_REPLAY_SUFFIX); + } + + public static Path getPeerSnapshotWALDir(String remoteWALDir, String peerId) { + return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX); + } + + public static Path getPeerSnapshotWALDir(Path remoteWALDir, String peerId) { + return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX); + } + /** * Do the sleeping logic * @param msg Why we sleep http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 1645d68..7ffd3da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -22,9 +22,9 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import java.io.IOException; import java.io.InterruptedIOException; @@ -40,7 +40,6 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.fs.Path; http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java index e9e3a97..9860774 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java @@ -50,7 +50,7 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb switch (state) { case RENAME_SYNC_REPLICATION_WALS_DIR: try { - replaySyncReplicationWALManager.renamePeerRemoteWALDir(peerId); + replaySyncReplicationWALManager.renameToPeerReplayWALDir(peerId); } catch (IOException e) { LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, e); setFailure("master-recover-standby", e); @@ -67,11 +67,11 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb .map(wal -> new ReplaySyncReplicationWALProcedure(peerId, replaySyncReplicationWALManager.removeWALRootPath(wal))) .toArray(ReplaySyncReplicationWALProcedure[]::new)); - setNextState(RecoverStandbyState.REMOVE_SYNC_REPLICATION_WALS_DIR); + setNextState(RecoverStandbyState.SNAPSHOT_SYNC_REPLICATION_WALS_DIR); return Flow.HAS_MORE_STATE; - case REMOVE_SYNC_REPLICATION_WALS_DIR: + case SNAPSHOT_SYNC_REPLICATION_WALS_DIR: try { - replaySyncReplicationWALManager.removePeerReplayWALDir(peerId); + replaySyncReplicationWALManager.renameToPeerSnapshotWALDir(peerId); } catch (IOException e) { LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e); throw new ProcedureYieldException(); @@ -85,7 +85,7 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb private List<Path> getReplayWALs(ReplaySyncReplicationWALManager replaySyncReplicationWALManager) throws ProcedureYieldException { try { - return replaySyncReplicationWALManager.getReplayWALs(peerId); + return replaySyncReplicationWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId); } catch (IOException e) { LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e); throw new ProcedureYieldException(); http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java index 7335fe0..254448a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java @@ -67,10 +67,7 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { } private void removeRemoteWALs(MasterProcedureEnv env) throws IOException { - ReplaySyncReplicationWALManager remoteWALManager = - env.getMasterServices().getReplaySyncReplicationWALManager(); - remoteWALManager.removePeerRemoteWALs(peerId); - remoteWALManager.removePeerReplayWALDir(peerId); + env.getMasterServices().getReplaySyncReplicationWALManager().removePeerRemoteWALs(peerId); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java index eac5aa4..348c134 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.master.replication; +import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerRemoteWALDir; +import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir; +import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapshotWALDir; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -25,31 +29,27 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + @InterfaceAudience.Private public class ReplaySyncReplicationWALManager { private static final Logger LOG = LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class); - private static final String REPLAY_SUFFIX = "-replay"; - private final MasterServices services; - private final Configuration conf; - private final FileSystem fs; private final Path walRootDir; @@ -60,69 +60,86 @@ public class ReplaySyncReplicationWALManager { public ReplaySyncReplicationWALManager(MasterServices services) { this.services = services; - this.conf = services.getConfiguration(); this.fs = services.getMasterFileSystem().getWALFileSystem(); this.walRootDir = services.getMasterFileSystem().getWALRootDir(); this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); } - public Path getPeerRemoteWALDir(String peerId) { - return new Path(this.remoteWALDir, peerId); - } - - private Path getPeerReplayWALDir(String peerId) { - return getPeerRemoteWALDir(peerId).suffix(REPLAY_SUFFIX); - } - public void createPeerRemoteWALDir(String peerId) throws IOException { - Path peerRemoteWALDir = getPeerRemoteWALDir(peerId); + Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId); if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) { throw new IOException("Unable to mkdir " + peerRemoteWALDir); } } - public void renamePeerRemoteWALDir(String peerId) throws IOException { - Path peerRemoteWALDir = getPeerRemoteWALDir(peerId); - Path peerReplayWALDir = peerRemoteWALDir.suffix(REPLAY_SUFFIX); - if (fs.exists(peerRemoteWALDir)) { - if (!fs.rename(peerRemoteWALDir, peerReplayWALDir)) { - throw new IOException("Failed rename remote wal dir from " + peerRemoteWALDir + " to " - + peerReplayWALDir + " for peer id=" + peerId); + private void rename(Path src, Path dst, String peerId) throws IOException { + if (fs.exists(src)) { + deleteDir(dst, peerId); + if (!fs.rename(src, dst)) { + throw new IOException( + "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId); } - LOG.info("Rename remote wal dir from {} to {} for peer id={}", remoteWALDir, peerReplayWALDir, - peerId); - } else if (!fs.exists(peerReplayWALDir)) { - throw new IOException("Remote wal dir " + peerRemoteWALDir + " and replay wal dir " - + peerReplayWALDir + " not exist for peer id=" + peerId); + LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId); + } else if (!fs.exists(dst)) { + throw new IOException( + "Want to rename from " + src + " to " + dst + ", but they both do not exist"); } } - public List<Path> getReplayWALs(String peerId) throws IOException { - Path peerReplayWALDir = getPeerReplayWALDir(peerId); - List<Path> replayWals = new ArrayList<>(); - RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(peerReplayWALDir, false); - while (iterator.hasNext()) { - replayWals.add(iterator.next().getPath()); + public void renameToPeerReplayWALDir(String peerId) throws IOException { + rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId), + peerId); + } + + public void renameToPeerSnapshotWALDir(String peerId) throws IOException { + rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId), + peerId); + } + + public List<Path> getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException { + Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId); + for (FileStatus status : fs.listStatus(peerReplayWALDir, + p -> p.getName().endsWith(ReplicationUtils.RENAME_WAL_SUFFIX))) { + Path src = status.getPath(); + String srcName = src.getName(); + String dstName = + srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length()); + FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName)); + } + List<Path> wals = new ArrayList<>(); + for (FileStatus status : fs.listStatus(peerReplayWALDir)) { + Path path = status.getPath(); + if (path.getName().endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) { + wals.add(path); + } else { + if (!fs.delete(path, true)) { + LOG.warn("Can not delete unused file: " + path); + } + } } - return replayWals; + return wals; } - public void removePeerReplayWALDir(String peerId) throws IOException { - Path peerReplayWALDir = getPeerReplayWALDir(peerId); + public void snapshotPeerReplayWALDir(String peerId) throws IOException { + Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId); if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) { throw new IOException( "Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId); } } - public void removePeerRemoteWALs(String peerId) throws IOException { - Path remoteWALDir = getPeerRemoteWALDir(peerId); - if (fs.exists(remoteWALDir) && !fs.delete(remoteWALDir, true)) { - throw new IOException( - "Failed to remove remote WALs dir " + remoteWALDir + " for peer id=" + peerId); + private void deleteDir(Path dir, String peerId) throws IOException { + if (!fs.delete(dir, true) && fs.exists(dir)) { + throw new IOException("Failed to remove dir " + dir + " for peer id=" + peerId); } } + public void removePeerRemoteWALs(String peerId) throws IOException { + deleteDir(getPeerRemoteWALDir(remoteWALDir, peerId), peerId); + deleteDir(getPeerReplayWALDir(remoteWALDir, peerId), peerId); + deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId); + } + public void initPeerWorkers(String peerId) { BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>(); services.getServerManager().getOnlineServers().keySet() @@ -144,4 +161,9 @@ public class ReplaySyncReplicationWALManager { // remove the "/" too. return pathStr.substring(walRootDir.toString().length() + 1); } + + @VisibleForTesting + public Path getRemoteWALDir() { + return remoteWALDir; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index ebe7a93..81ee6b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -118,7 +118,7 @@ public class TransitPeerSyncReplicationStateProcedure env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState); if (toState == SyncReplicationState.ACTIVE) { Path remoteWALDirForPeer = - ReplicationUtils.getRemoteWALDirForPeer(desc.getPeerConfig().getRemoteWALDir(), peerId); + ReplicationUtils.getPeerRemoteWALDir(desc.getPeerConfig().getRemoteWALDir(), peerId); // check whether the remote wal directory is present if (!remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration()) .exists(remoteWALDirForPeer)) { @@ -152,7 +152,7 @@ public class TransitPeerSyncReplicationStateProcedure throws ProcedureYieldException, IOException { MasterFileSystem mfs = env.getMasterFileSystem(); Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); - Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId); + Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); FileSystem walFs = mfs.getWALFileSystem(); if (walFs.exists(remoteWALDirForPeer)) { LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway", http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5c7bae0..268982a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1957,8 +1957,7 @@ public class HRegionServer extends HasThread implements sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); if (this.csm != null) { // SplitLogWorker needs csm. If none, don't start this. - this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, - this, walFactory); + this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); splitLogWorker.start(); } else { LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null"); http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index 4529943..09ec477 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler; import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -37,4 +38,9 @@ public interface ReplicationSourceService extends ReplicationService { * Returns a Handler to handle peer procedures. */ PeerProcedureHandler getPeerProcedureHandler(); + + /** + * Return the replication peers. + */ + ReplicationPeers getReplicationPeers(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index a1c2030..4a9712c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -23,22 +23,31 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.net.ConnectException; import java.net.SocketTimeoutException; - -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.Optional; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status; +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -67,67 +76,133 @@ public class SplitLogWorker implements Runnable { Thread worker; // thread pool which executes recovery work private SplitLogWorkerCoordination coordination; - private Configuration conf; private RegionServerServices server; public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server, TaskExecutor splitTaskExecutor) { this.server = server; - this.conf = conf; this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination(); coordination.init(server, conf, splitTaskExecutor, this); } - public SplitLogWorker(final Server hserver, final Configuration conf, - final RegionServerServices server, final LastSequenceId sequenceIdChecker, - final WALFactory factory) { - this(hserver, conf, server, new TaskExecutor() { - @Override - public Status exec(String filename, CancelableProgressable p) { - Path walDir; - FileSystem fs; - try { - walDir = FSUtils.getWALRootDir(conf); - fs = walDir.getFileSystem(conf); - } catch (IOException e) { - LOG.warn("could not find root dir or fs", e); - return Status.RESIGNED; - } - // TODO have to correctly figure out when log splitting has been - // interrupted or has encountered a transient error and when it has - // encountered a bad non-retry-able persistent error. - try { - if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), - fs, conf, p, sequenceIdChecker, - server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) { - return Status.PREEMPTED; - } - } catch (InterruptedIOException iioe) { - LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe); - return Status.RESIGNED; - } catch (IOException e) { - if (e instanceof FileNotFoundException) { - // A wal file may not exist anymore. Nothing can be recovered so move on - LOG.warn("WAL {} does not exist anymore", filename, e); - return Status.DONE; - } - Throwable cause = e.getCause(); - if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException - || cause instanceof ConnectException - || cause instanceof SocketTimeoutException)) { - LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, " - + "resigning", e); - return Status.RESIGNED; - } else if (cause instanceof InterruptedException) { - LOG.warn("log splitting of " + filename + " interrupted, resigning", e); - return Status.RESIGNED; - } - LOG.warn("log splitting of " + filename + " failed, returning error", e); - return Status.ERR; - } + public SplitLogWorker(Configuration conf, RegionServerServices server, + LastSequenceId sequenceIdChecker, WALFactory factory) { + this(server, conf, server, (f, p) -> splitLog(f, p, conf, server, sequenceIdChecker, factory)); + } + + // returns whether we need to continue the split work + private static boolean processSyncReplicationWAL(String name, Configuration conf, + RegionServerServices server, FileSystem fs, Path walDir) throws IOException { + Path walFile = new Path(walDir, name); + String filename = walFile.getName(); + Optional<String> optSyncPeerId = + SyncReplicationWALProvider.getSyncReplicationPeerIdFromWALName(filename); + if (!optSyncPeerId.isPresent()) { + return true; + } + String peerId = optSyncPeerId.get(); + ReplicationPeerImpl peer = + server.getReplicationSourceService().getReplicationPeers().getPeer(peerId); + if (peer == null || !peer.getPeerConfig().isSyncReplication()) { + return true; + } + Pair<SyncReplicationState, SyncReplicationState> stateAndNewState = + peer.getSyncReplicationStateAndNewState(); + if (stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) && + stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) { + // copy the file to remote and overwrite the previous one + String remoteWALDir = peer.getPeerConfig().getRemoteWALDir(); + Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); + Path tmpRemoteWAL = new Path(remoteWALDirForPeer, filename + ".tmp"); + FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); + try (FSDataInputStream in = fs.open(walFile); @SuppressWarnings("deprecation") + FSDataOutputStream out = remoteFs.createNonRecursive(tmpRemoteWAL, true, + FSUtils.getDefaultBufferSize(remoteFs), remoteFs.getDefaultReplication(tmpRemoteWAL), + remoteFs.getDefaultBlockSize(tmpRemoteWAL), null)) { + IOUtils.copy(in, out); + } + Path toCommitRemoteWAL = + new Path(remoteWALDirForPeer, filename + ReplicationUtils.RENAME_WAL_SUFFIX); + // Some FileSystem implementations may not support atomic rename so we need to do it in two + // phases + FSUtils.renameFile(remoteFs, tmpRemoteWAL, toCommitRemoteWAL); + FSUtils.renameFile(remoteFs, toCommitRemoteWAL, new Path(remoteWALDirForPeer, filename)); + } else if ((stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) && + stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) || + stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)) { + // check whether we still need to process this file + // actually we only write wal file which name is ended with .syncrep in A state, and after + // transiting to a state other than A, we will reopen all the regions so the data in the wal + // will be flushed so the wal file will be archived soon. But it is still possible that there + // is a server crash when we are transiting from A to S, to simplify the logic of the transit + // procedure, here we will also check the remote snapshot directory in state S, so that we do + // not need wait until all the wal files with .syncrep suffix to be archived before finishing + // the procedure. + String remoteWALDir = peer.getPeerConfig().getRemoteWALDir(); + Path remoteSnapshotDirForPeer = ReplicationUtils.getPeerSnapshotWALDir(remoteWALDir, peerId); + FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); + if (remoteFs.exists(new Path(remoteSnapshotDirForPeer, filename))) { + // the file has been replayed when the remote cluster was transited from S to DA, the + // content will be replicated back to us so give up split it. + LOG.warn("Giveup splitting {} since it has been replayed in the remote cluster and " + + "the content will be replicated back", filename); + return false; + } + } + return true; + } + + private static Status splitLog(String name, CancelableProgressable p, Configuration conf, + RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) { + Path walDir; + FileSystem fs; + try { + walDir = FSUtils.getWALRootDir(conf); + fs = walDir.getFileSystem(conf); + } catch (IOException e) { + LOG.warn("could not find root dir or fs", e); + return Status.RESIGNED; + } + try { + if (!processSyncReplicationWAL(name, conf, server, fs, walDir)) { return Status.DONE; } - }); + } catch (IOException e) { + LOG.warn("failed to process sync replication wal {}", name, e); + return Status.RESIGNED; + } + // TODO have to correctly figure out when log splitting has been + // interrupted or has encountered a transient error and when it has + // encountered a bad non-retry-able persistent error. + try { + if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf, + p, sequenceIdChecker, server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), + factory)) { + return Status.PREEMPTED; + } + } catch (InterruptedIOException iioe) { + LOG.warn("log splitting of " + name + " interrupted, resigning", iioe); + return Status.RESIGNED; + } catch (IOException e) { + if (e instanceof FileNotFoundException) { + // A wal file may not exist anymore. Nothing can be recovered so move on + LOG.warn("WAL {} does not exist anymore", name, e); + return Status.DONE; + } + Throwable cause = e.getCause(); + if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException || + cause instanceof ConnectException || cause instanceof SocketTimeoutException)) { + LOG.warn("log replaying of " + name + " can't connect to the target regionserver, " + + "resigning", e); + return Status.RESIGNED; + } else if (cause instanceof InterruptedException) { + LOG.warn("log splitting of " + name + " interrupted, resigning", e); + return Status.RESIGNED; + } + LOG.warn("log splitting of " + name + " failed, returning error", e); + return Status.ERR; + } + return Status.DONE; } @Override @@ -191,6 +266,7 @@ public class SplitLogWorker implements Runnable { * {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in * SplitLogManager.TaskFinisher */ + @FunctionalInterface public interface TaskExecutor { enum Status { DONE(), http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java index 8ecfede..4301ae7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java @@ -32,13 +32,13 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; * An {@link AsyncWriter} wrapper which writes data to a set of {@link AsyncWriter} instances. */ @InterfaceAudience.Private -public abstract class CombinedAsyncWriter implements AsyncWriter { +public final class CombinedAsyncWriter implements AsyncWriter { private static final Logger LOG = LoggerFactory.getLogger(CombinedAsyncWriter.class); - protected final ImmutableList<AsyncWriter> writers; + private final ImmutableList<AsyncWriter> writers; - protected CombinedAsyncWriter(ImmutableList<AsyncWriter> writers) { + private CombinedAsyncWriter(ImmutableList<AsyncWriter> writers) { this.writers = writers; } @@ -66,69 +66,29 @@ public abstract class CombinedAsyncWriter implements AsyncWriter { } } - protected abstract void doSync(CompletableFuture<Long> future); - - @Override - public CompletableFuture<Long> sync() { - CompletableFuture<Long> future = new CompletableFuture<>(); - doSync(future); - return future; - } - @Override public void append(Entry entry) { writers.forEach(w -> w.append(entry)); } - public enum Mode { - SEQUENTIAL, PARALLEL + @Override + public CompletableFuture<Long> sync() { + CompletableFuture<Long> future = new CompletableFuture<>(); + AtomicInteger remaining = new AtomicInteger(writers.size()); + writers.forEach(w -> w.sync().whenComplete((length, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + if (remaining.decrementAndGet() == 0) { + future.complete(length); + } + })); + return future; } - public static CombinedAsyncWriter create(Mode mode, AsyncWriter writer, AsyncWriter... writers) { - ImmutableList<AsyncWriter> ws = - ImmutableList.<AsyncWriter> builder().add(writer).add(writers).build(); - switch (mode) { - case SEQUENTIAL: - return new CombinedAsyncWriter(ws) { - - private void doSync(CompletableFuture<Long> future, Long length, int index) { - if (index == writers.size()) { - future.complete(length); - return; - } - writers.get(index).sync().whenComplete((len, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - doSync(future, len, index + 1); - }); - } - - @Override - protected void doSync(CompletableFuture<Long> future) { - doSync(future, null, 0); - } - }; - case PARALLEL: - return new CombinedAsyncWriter(ws) { - - @Override - protected void doSync(CompletableFuture<Long> future) { - AtomicInteger remaining = new AtomicInteger(writers.size()); - writers.forEach(w -> w.sync().whenComplete((length, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - if (remaining.decrementAndGet() == 0) { - future.complete(length); - } - })); - } - }; - default: - throw new IllegalArgumentException("Unknown mode: " + mode); - } + public static CombinedAsyncWriter create(AsyncWriter writer, AsyncWriter... writers) { + return new CombinedAsyncWriter( + ImmutableList.<AsyncWriter> builder().add(writer).add(writers).build()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java index a98567a..3967e78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; @@ -50,6 +51,13 @@ public class DualAsyncFSWAL extends AsyncFSWAL { this.remoteWalDir = remoteWalDir; } + // will be overridden in testcase + @VisibleForTesting + protected AsyncWriter createCombinedAsyncWriter(AsyncWriter localWriter, + AsyncWriter remoteWriter) { + return CombinedAsyncWriter.create(remoteWriter, localWriter); + } + @Override protected AsyncWriter createWriterInstance(Path path) throws IOException { AsyncWriter localWriter = super.createWriterInstance(path); @@ -66,8 +74,7 @@ public class DualAsyncFSWAL extends AsyncFSWAL { closeWriter(localWriter); } } - return CombinedAsyncWriter.create(CombinedAsyncWriter.Mode.SEQUENTIAL, remoteWriter, - localWriter); + return createCombinedAsyncWriter(localWriter, remoteWriter); } // Allow temporarily skipping the creation of remote writer. When failing to write to the remote http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 2199415..b04f0cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -288,4 +288,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() { return syncReplicationPeerInfoProvider; } + + @Override + public ReplicationPeers getReplicationPeers() { + return replicationPeers; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index f25b073..827cfa9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -652,7 +652,7 @@ public class ReplicationSourceManager implements ReplicationListener { private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals) throws IOException { - Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId); + Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); for (String wal : wals) { Path walFile = new Path(remoteWALDirForPeer, wal); http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java index 75274ea..170441b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java @@ -77,8 +77,7 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv return false; } Pair<SyncReplicationState, SyncReplicationState> states = - peer.getSyncReplicationStateAndNewState(); + peer.getSyncReplicationStateAndNewState(); return checker.test(states.getFirst(), states.getSecond()); } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 8a1f948..5b968db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -843,6 +843,15 @@ public abstract class FSUtils extends CommonFSUtils { return frags; } + public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException { + if (fs.exists(dst) && !fs.delete(dst, false)) { + throw new IOException("Can not delete " + dst); + } + if (!fs.rename(src, dst)) { + throw new IOException("Can not rename from " + src + " to " + dst); + } + } + /** * A {@link PathFilter} that returns only regular files. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 8e82d8b..82f8a89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDir import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -51,6 +53,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.com.google.common.collect.Streams; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; @@ -67,8 +70,9 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class); + // only for injecting errors for testcase, do not use it for other purpose. @VisibleForTesting - public static final String LOG_SUFFIX = ".syncrep"; + public static final String DUAL_WAL_IMPL = "hbase.wal.sync.impl"; private final WALProvider provider; @@ -126,12 +130,35 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen } private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException { - return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), - ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir), - CommonFSUtils.getWALRootDir(conf), - ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId), - getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), - conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass); + Class<? extends DualAsyncFSWAL> clazz = + conf.getClass(DUAL_WAL_IMPL, DualAsyncFSWAL.class, DualAsyncFSWAL.class); + try { + Constructor<?> constructor = null; + for (Constructor<?> c : clazz.getDeclaredConstructors()) { + if (c.getParameterCount() > 0) { + constructor = c; + break; + } + } + if (constructor == null) { + throw new IllegalArgumentException("No valid constructor provided for class " + clazz); + } + constructor.setAccessible(true); + return (DualAsyncFSWAL) constructor.newInstance( + CommonFSUtils.getWALFileSystem(conf), + ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir), + CommonFSUtils.getWALRootDir(conf), + ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId), + getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), + conf, listeners, true, getLogPrefix(peerId), ReplicationUtils.SYNC_WAL_SUFFIX, + eventLoopGroup, channelClass); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + Throwable cause = e.getTargetException(); + Throwables.propagateIfPossible(cause, IOException.class); + throw new RuntimeException(cause); + } } private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException { @@ -304,7 +331,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen * </p> */ public static Optional<String> getSyncReplicationPeerIdFromWALName(String name) { - if (!name.endsWith(LOG_SUFFIX)) { + if (!name.endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) { // fast path to return earlier if the name is not for a sync replication peer. return Optional.empty(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index c6ffeea..6462234 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -1141,7 +1141,7 @@ public class TestReplicationAdmin { LOG.info("Expected error:", e); } TEST_UTIL.getTestFileSystem() - .mkdirs(ReplicationUtils.getRemoteWALDirForPeer(rootDir, ID_SECOND)); + .mkdirs(ReplicationUtils.getPeerRemoteWALDir(rootDir, ID_SECOND)); hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); assertEquals(SyncReplicationState.ACTIVE, hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java index 07aa6a8..f73b4f1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; -import java.util.Arrays; -import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -39,23 +37,18 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; -@RunWith(Parameterized.class) @Category({ RegionServerTests.class, MediumTests.class }) public class TestCombinedAsyncWriter { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class); + HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); @@ -68,15 +61,6 @@ public class TestCombinedAsyncWriter { @Rule public final TestName name = new TestName(); - @Parameter - public CombinedAsyncWriter.Mode mode; - - @Parameters(name = "{index}: mode={0}") - public static List<Object[]> params() { - return Arrays.asList(new Object[] { CombinedAsyncWriter.Mode.SEQUENTIAL }, - new Object[] { CombinedAsyncWriter.Mode.PARALLEL }); - } - @BeforeClass public static void setUpBeforeClass() throws Exception { EVENT_LOOP_GROUP = new NioEventLoopGroup(); @@ -125,7 +109,7 @@ public class TestCombinedAsyncWriter { EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); AsyncWriter writer2 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path2, false, EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); - CombinedAsyncWriter writer = CombinedAsyncWriter.create(mode, writer1, writer2)) { + CombinedAsyncWriter writer = CombinedAsyncWriter.create(writer1, writer2)) { ProtobufLogTestHelper.doWrite(new WriterOverAsyncWriter(writer), withTrailer, tableName, columnCount, recordCount, row, timestamp); try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path1)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java new file mode 100644 index 0000000..fb3daf2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; + +class DualAsyncFSWALForTest extends DualAsyncFSWAL { + + private boolean localBroken; + + private boolean remoteBroken; + + private CountDownLatch arrive; + + private CountDownLatch resume; + + private final class MyCombinedAsyncWriter implements AsyncWriter { + + private final AsyncWriter localWriter; + + private final AsyncWriter remoteWriter; + + public MyCombinedAsyncWriter(AsyncWriter localWriter, AsyncWriter remoteWriter) { + this.localWriter = localWriter; + this.remoteWriter = remoteWriter; + } + + @Override + public long getLength() { + return localWriter.getLength(); + } + + @Override + public void close() throws IOException { + Closeables.close(localWriter, true); + Closeables.close(remoteWriter, true); + } + + @Override + public CompletableFuture<Long> sync() { + CompletableFuture<Long> localFuture; + CompletableFuture<Long> remoteFuture; + if (!localBroken) { + localFuture = localWriter.sync(); + } else { + localFuture = new CompletableFuture<>(); + localFuture.completeExceptionally(new IOException("Inject error")); + } + if (!remoteBroken) { + remoteFuture = remoteWriter.sync(); + } else { + remoteFuture = new CompletableFuture<>(); + remoteFuture.completeExceptionally(new IOException("Inject error")); + } + return CompletableFuture.allOf(localFuture, remoteFuture).thenApply(v -> { + return localFuture.getNow(0L); + }); + } + + @Override + public void append(Entry entry) { + if (!localBroken) { + localWriter.append(entry); + } + if (!remoteBroken) { + remoteWriter.append(entry); + } + } + } + + public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir, + String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners, + boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, + Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { + super(fs, remoteFs, rootDir, remoteWalDir, logDir, archiveDir, conf, listeners, failIfWALExists, + prefix, suffix, eventLoopGroup, channelClass); + } + + @Override + protected AsyncWriter createCombinedAsyncWriter(AsyncWriter localWriter, + AsyncWriter remoteWriter) { + return new MyCombinedAsyncWriter(localWriter, remoteWriter); + } + + @Override + protected AsyncWriter createWriterInstance(Path path) throws IOException { + if (arrive != null) { + arrive.countDown(); + try { + resume.await(); + } catch (InterruptedException e) { + } + } + if (localBroken || remoteBroken) { + throw new IOException("WAL broken"); + } + return super.createWriterInstance(path); + } + + public void setLocalBroken() { + this.localBroken = true; + } + + public void setRemoteBroken() { + this.remoteBroken = true; + } + + public void suspendLogRoll() { + arrive = new CountDownLatch(1); + resume = new CountDownLatch(1); + } + + public void waitUntilArrive() throws InterruptedException { + arrive.await(); + } + + public void resumeLogRoll() { + resume.countDown(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index de679be..095be90 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -72,9 +72,9 @@ public class SyncReplicationTestBase { protected static String PEER_ID = "1"; - protected static Path remoteWALDir1; + protected static Path REMOTE_WAL_DIR1; - protected static Path remoteWALDir2; + protected static Path REMOTE_WAL_DIR2; private static void initTestingUtility(HBaseTestingUtility util, String zkParent) { util.setZkCluster(ZK_UTIL.getZkCluster()); @@ -109,22 +109,22 @@ public class SyncReplicationTestBase { UTIL2.getAdmin().createTable(td); FileSystem fs1 = UTIL1.getTestFileSystem(); FileSystem fs2 = UTIL2.getTestFileSystem(); - remoteWALDir1 = + REMOTE_WAL_DIR1 = new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(), "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory()); - remoteWALDir2 = + REMOTE_WAL_DIR2 = new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(), "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory()); UTIL1.getAdmin().addReplicationPeer(PEER_ID, ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) .setReplicateAllUserTables(false) .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) - .setRemoteWALDir(remoteWALDir2.toUri().toString()).build()); + .setRemoteWALDir(REMOTE_WAL_DIR2.toUri().toString()).build()); UTIL2.getAdmin().addReplicationPeer(PEER_ID, ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey()) .setReplicateAllUserTables(false) .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) - .setRemoteWALDir(remoteWALDir1.toUri().toString()).build()); + .setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build()); } @AfterClass http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java index b663c44..fce0cdf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java @@ -37,8 +37,7 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestSyncReplicationActive.class); - + HBaseClassTestRule.forClass(TestSyncReplicationActive.class); @Test public void testActive() throws Exception { @@ -58,7 +57,7 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { verifyNotReplicatedThroughRegion(UTIL2, 0, 100); // Ensure that there's no cluster id in remote log entries. - verifyNoClusterIdInRemoteLog(UTIL2, remoteWALDir2, PEER_ID); + verifyNoClusterIdInRemoteLog(UTIL2, REMOTE_WAL_DIR2, PEER_ID); UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, SyncReplicationState.DOWNGRADE_ACTIVE); http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalCopyToRemote.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalCopyToRemote.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalCopyToRemote.java new file mode 100644 index 0000000..cf8993b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalCopyToRemote.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.concurrent.ExecutionException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationMoreLogsInLocalCopyToRemote extends SyncReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationMoreLogsInLocalCopyToRemote.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestSyncReplicationMoreLogsInLocalCopyToRemote.class); + + @BeforeClass + public static void setUp() throws Exception { + UTIL1.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL, + DualAsyncFSWALForTest.class, DualAsyncFSWAL.class); + UTIL2.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL, + DualAsyncFSWALForTest.class, DualAsyncFSWAL.class); + SyncReplicationTestBase.setUp(); + } + + @Test + public void testSplitLog() throws Exception { + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME); + DualAsyncFSWALForTest wal = + (DualAsyncFSWALForTest) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build()); + wal.setRemoteBroken(); + try (AsyncConnection conn = + ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) { + AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build(); + try { + table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0))).get(); + fail("Should fail since the rs will crash and we will not retry"); + } catch (ExecutionException e) { + // expected + LOG.info("Expected error:", e); + } + } + UTIL1.waitFor(60000, new ExplainingPredicate<Exception>() { + + @Override + public boolean evaluate() throws Exception { + try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) { + return table.exists(new Get(Bytes.toBytes(0))); + } + } + + @Override + public String explainFailure() throws Exception { + return "The row is still not available"; + } + }); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + // We should have copied the local log to remote, so we should be able to get the value + try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) { + assertEquals(0, Bytes.toInt(table.get(new Get(Bytes.toBytes(0))).getValue(CF, CQ))); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalGiveUpSplitting.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalGiveUpSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalGiveUpSplitting.java new file mode 100644 index 0000000..9a6d242 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalGiveUpSplitting.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationMoreLogsInLocalGiveUpSplitting extends SyncReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationMoreLogsInLocalGiveUpSplitting.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestSyncReplicationMoreLogsInLocalGiveUpSplitting.class); + + @BeforeClass + public static void setUp() throws Exception { + UTIL1.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL, + DualAsyncFSWALForTest.class, DualAsyncFSWAL.class); + UTIL2.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL, + DualAsyncFSWALForTest.class, DualAsyncFSWAL.class); + SyncReplicationTestBase.setUp(); + } + + @Test + public void testSplitLog() throws Exception { + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + UTIL2.getAdmin().disableReplicationPeer(PEER_ID); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) { + table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0))); + } + HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME); + DualAsyncFSWALForTest wal = + (DualAsyncFSWALForTest) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build()); + wal.setRemoteBroken(); + wal.suspendLogRoll(); + try (AsyncConnection conn = + ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) { + AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1) + .setWriteRpcTimeout(5, TimeUnit.SECONDS).build(); + try { + table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(1))).get(); + fail("Should fail since the rs will hang and we will get a rpc timeout"); + } catch (ExecutionException e) { + // expected + LOG.info("Expected error:", e); + } + } + wal.waitUntilArrive(); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + wal.resumeLogRoll(); + try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) { + assertEquals(0, Bytes.toInt(table.get(new Get(Bytes.toBytes(0))).getValue(CF, CQ))); + // we failed to write this entry to remote so it should not exist + assertFalse(table.exists(new Get(Bytes.toBytes(1)))); + } + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + // make sure that the region is online. We can not use waitTableAvailable since the table in + // stand by state can not be read from client. + try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) { + try { + table.exists(new Get(Bytes.toBytes(0))); + } catch (DoNotRetryIOException | RetriesExhaustedException e) { + // expected + assertThat(e.getMessage(), containsString("STANDBY")); + } + } + HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); + // we give up splitting the whole wal file so this record will also be gone. + assertTrue(region.get(new Get(Bytes.toBytes(0))).isEmpty()); + UTIL2.getAdmin().enableReplicationPeer(PEER_ID); + // finally it should be replicated back + waitUntilReplicationDone(UTIL1, 1); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java index 7d380c1..0cd1846 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -66,12 +65,12 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase SyncReplicationState.ACTIVE); MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem(); - Path remoteWALDir = ReplicationUtils.getRemoteWALDirForPeer( + Path remoteWALDir = ReplicationUtils.getPeerRemoteWALDir( new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID); FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir); assertEquals(1, remoteWALStatus.length); Path remoteWAL = remoteWALStatus[0].getPath(); - assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX)); + assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)); writeAndVerifyReplication(UTIL1, UTIL2, 0, 100); HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME); @@ -81,7 +80,7 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir); assertEquals(1, remoteWALStatus.length); remoteWAL = remoteWALStatus[0].getPath(); - assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX)); + assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)); UTIL1.getAdmin().disableReplicationPeer(PEER_ID); write(UTIL1, 100, 200); http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java index 8526af8..de409fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java @@ -97,25 +97,25 @@ public class TestSyncReplicationStandBy extends SyncReplicationTestBase { writeAndVerifyReplication(UTIL1, UTIL2, 0, 100); // Remove the peers in ACTIVE & STANDBY cluster. - FileSystem fs2 = remoteWALDir2.getFileSystem(UTIL2.getConfiguration()); - Assert.assertTrue(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID))); + FileSystem fs2 = REMOTE_WAL_DIR2.getFileSystem(UTIL2.getConfiguration()); + Assert.assertTrue(fs2.exists(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID))); UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, SyncReplicationState.DOWNGRADE_ACTIVE); - Assert.assertFalse(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID))); - Assert.assertFalse(fs2.exists(getReplayRemoteWALs(remoteWALDir2, PEER_ID))); + Assert.assertFalse(fs2.exists(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID))); + Assert.assertFalse(fs2.exists(getReplayRemoteWALs(REMOTE_WAL_DIR2, PEER_ID))); UTIL1.getAdmin().removeReplicationPeer(PEER_ID); - verifyRemovedPeer(PEER_ID, remoteWALDir1, UTIL1); + verifyRemovedPeer(PEER_ID, REMOTE_WAL_DIR1, UTIL1); // Peer remoteWAL dir will be renamed to replay WAL dir when transit from S to DA, and the // replay WAL dir will be removed after replaying all WALs, so create a emtpy dir here to test // whether the removeReplicationPeer would remove the remoteWAL dir. - fs2.create(getRemoteWALDir(remoteWALDir2, PEER_ID)); - fs2.create(getReplayRemoteWALs(remoteWALDir2, PEER_ID)); - Assert.assertTrue(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID))); - Assert.assertTrue(fs2.exists(getReplayRemoteWALs(remoteWALDir2, PEER_ID))); + fs2.create(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID)); + fs2.create(getReplayRemoteWALs(REMOTE_WAL_DIR2, PEER_ID)); + Assert.assertTrue(fs2.exists(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID))); + Assert.assertTrue(fs2.exists(getReplayRemoteWALs(REMOTE_WAL_DIR2, PEER_ID))); UTIL2.getAdmin().removeReplicationPeer(PEER_ID); - verifyRemovedPeer(PEER_ID, remoteWALDir2, UTIL2); + verifyRemovedPeer(PEER_ID, REMOTE_WAL_DIR2, UTIL2); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java index ebb21a4..2563669 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; @@ -151,7 +152,8 @@ public class TestRecoverStandbyProcedure { } private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException { - Path peerRemoteWALDir = replaySyncReplicationWALManager.getPeerRemoteWALDir(PEER_ID); + Path peerRemoteWALDir = ReplicationUtils + .getPeerRemoteWALDir(replaySyncReplicationWALManager.getRemoteWALDir(), PEER_ID); if (!fs.exists(peerRemoteWALDir)) { fs.mkdirs(peerRemoteWALDir); } http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index d98b7f85..febe764 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -84,7 +84,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; @@ -618,7 +617,7 @@ public abstract class TestReplicationSourceManager { try { // make sure that we can deal with files which does not exist String walNameNotExists = - "remoteWAL-12345-" + slaveId + ".12345" + SyncReplicationWALProvider.LOG_SUFFIX; + "remoteWAL-12345-" + slaveId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX; Path wal = new Path(logDir, walNameNotExists); manager.preLogRoll(wal); manager.postLogRoll(wal); @@ -626,7 +625,7 @@ public abstract class TestReplicationSourceManager { Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId); fs.mkdirs(remoteLogDirForPeer); String walName = - "remoteWAL-12345-" + slaveId + ".23456" + SyncReplicationWALProvider.LOG_SUFFIX; + "remoteWAL-12345-" + slaveId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX; Path remoteWAL = new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); fs.create(remoteWAL).close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/599080cc/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java index 69ed44d..8189cef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java @@ -86,7 +86,6 @@ public class TestSyncReplicationWALProvider { @Override public boolean checkState(TableName table, BiPredicate<SyncReplicationState, SyncReplicationState> checker) { - // TODO Implement SyncReplicationPeerInfoProvider.isInState return false; } }