HBASE-20432 Cleanup related resources when remove a sync replication peer
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e29e77e9 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e29e77e9 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e29e77e9 Branch: refs/heads/HBASE-19064 Commit: e29e77e95521f505d8c55ddc604d41581bd64d8a Parents: 4c6a7c3 Author: huzheng <open...@gmail.com> Authored: Wed Apr 18 20:38:33 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri May 25 10:11:48 2018 +0800 ---------------------------------------------------------------------- .../master/replication/RemovePeerProcedure.java | 10 +++++ .../ReplaySyncReplicationWALManager.java | 8 ++++ .../replication/SyncReplicationTestBase.java | 45 +++++++++++++++++--- .../replication/TestSyncReplicationActive.java | 9 ++-- .../replication/TestSyncReplicationStandBy.java | 31 ++++++++++++-- 5 files changed, 89 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e29e77e9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java index 82dc07e..7335fe0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java @@ -66,9 +66,19 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { env.getReplicationPeerManager().removePeer(peerId); } + private void removeRemoteWALs(MasterProcedureEnv env) throws IOException { + ReplaySyncReplicationWALManager remoteWALManager = + env.getMasterServices().getReplaySyncReplicationWALManager(); + remoteWALManager.removePeerRemoteWALs(peerId); + remoteWALManager.removePeerReplayWALDir(peerId); + } + @Override protected void postPeerModification(MasterProcedureEnv env) throws IOException, ReplicationException { + if (peerConfig.isSyncReplication()) { + removeRemoteWALs(env); + } env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId); if (peerConfig.isSerial()) { env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId); http://git-wip-us.apache.org/repos/asf/hbase/blob/e29e77e9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java index 72f5c37..eac5aa4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java @@ -115,6 +115,14 @@ public class ReplaySyncReplicationWALManager { } } + public void removePeerRemoteWALs(String peerId) throws IOException { + Path remoteWALDir = getPeerRemoteWALDir(peerId); + if (fs.exists(remoteWALDir) && !fs.delete(remoteWALDir, true)) { + throw new IOException( + "Failed to remove remote WALs dir " + remoteWALDir + " for peer id=" + peerId); + } + } + public void initPeerWorkers(String peerId) { BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>(); services.getServerManager().getOnlineServers().keySet() http://git-wip-us.apache.org/repos/asf/hbase/blob/e29e77e9/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index 0d5fce8..de679be 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; @@ -71,6 +72,10 @@ public class SyncReplicationTestBase { protected static String PEER_ID = "1"; + protected static Path remoteWALDir1; + + protected static Path remoteWALDir2; + private static void initTestingUtility(HBaseTestingUtility util, String zkParent) { util.setZkCluster(ZK_UTIL.getZkCluster()); Configuration conf = util.getConfiguration(); @@ -104,11 +109,11 @@ public class SyncReplicationTestBase { UTIL2.getAdmin().createTable(td); FileSystem fs1 = UTIL1.getTestFileSystem(); FileSystem fs2 = UTIL2.getTestFileSystem(); - Path remoteWALDir1 = - new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(), + remoteWALDir1 = + new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(), "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory()); - Path remoteWALDir2 = - new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(), + remoteWALDir2 = + new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(), "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory()); UTIL1.getAdmin().addReplicationPeer(PEER_ID, ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) @@ -188,7 +193,37 @@ public class SyncReplicationTestBase { protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) { Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); - return new Path(remoteWALDir, PEER_ID); + return getRemoteWALDir(remoteWALDir, peerId); + } + + protected Path getRemoteWALDir(Path remoteWALDir, String peerId) { + return new Path(remoteWALDir, peerId); + } + + protected Path getReplayRemoteWALs(Path remoteWALDir, String peerId) { + return new Path(remoteWALDir, peerId + "-replay"); + } + + protected void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtility utility) + throws Exception { + ReplicationPeerStorage rps = ReplicationStorageFactory + .getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration()); + try { + rps.getPeerSyncReplicationState(peerId); + fail("Should throw exception when get the sync replication state of a removed peer."); + } catch (NullPointerException e) { + // ignore. + } + try { + rps.getPeerNewSyncReplicationState(peerId); + fail("Should throw exception when get the new sync replication state of a removed peer"); + } catch (NullPointerException e) { + // ignore. + } + try (FileSystem fs = utility.getTestFileSystem()) { + Assert.assertFalse(fs.exists(getRemoteWALDir(remoteWALDir, peerId))); + Assert.assertFalse(fs.exists(getReplayRemoteWALs(remoteWALDir, peerId))); + } } protected void verifyReplicationRequestRejection(HBaseTestingUtility utility, http://git-wip-us.apache.org/repos/asf/hbase/blob/e29e77e9/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java index f9020a0..b663c44 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java @@ -58,7 +58,7 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { verifyNotReplicatedThroughRegion(UTIL2, 0, 100); // Ensure that there's no cluster id in remote log entries. - verifyNoClusterIdInRemoteLog(UTIL2, PEER_ID); + verifyNoClusterIdInRemoteLog(UTIL2, remoteWALDir2, PEER_ID); UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, SyncReplicationState.DOWNGRADE_ACTIVE); @@ -84,12 +84,9 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { write(UTIL2, 200, 300); } - private void verifyNoClusterIdInRemoteLog(HBaseTestingUtility utility, String peerId) - throws Exception { + private void verifyNoClusterIdInRemoteLog(HBaseTestingUtility utility, Path remoteDir, + String peerId) throws Exception { FileSystem fs2 = utility.getTestFileSystem(); - Path remoteDir = - new Path(utility.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(), - "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory()); FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId)); Assert.assertTrue(files.length > 0); for (FileStatus file : files) { http://git-wip-us.apache.org/repos/asf/hbase/blob/e29e77e9/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java index ed61d2a..8526af8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.Arrays; + +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -84,13 +87,35 @@ public class TestSyncReplicationStandBy extends SyncReplicationTestBase { assertDisallow(table, t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new Get(Bytes.toBytes("row1"))))); assertDisallow(table, - t -> t - .put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")), + t -> t.put( + Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")), new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, Bytes.toBytes("row1"))))); assertDisallow(table, t -> t.mutateRow(new RowMutations(Bytes.toBytes("row")) - .add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))))); + .add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))))); } // We should still allow replication writes writeAndVerifyReplication(UTIL1, UTIL2, 0, 100); + + // Remove the peers in ACTIVE & STANDBY cluster. + FileSystem fs2 = remoteWALDir2.getFileSystem(UTIL2.getConfiguration()); + Assert.assertTrue(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID))); + + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + Assert.assertFalse(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID))); + Assert.assertFalse(fs2.exists(getReplayRemoteWALs(remoteWALDir2, PEER_ID))); + + UTIL1.getAdmin().removeReplicationPeer(PEER_ID); + verifyRemovedPeer(PEER_ID, remoteWALDir1, UTIL1); + + // Peer remoteWAL dir will be renamed to replay WAL dir when transit from S to DA, and the + // replay WAL dir will be removed after replaying all WALs, so create a emtpy dir here to test + // whether the removeReplicationPeer would remove the remoteWAL dir. + fs2.create(getRemoteWALDir(remoteWALDir2, PEER_ID)); + fs2.create(getReplayRemoteWALs(remoteWALDir2, PEER_ID)); + Assert.assertTrue(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID))); + Assert.assertTrue(fs2.exists(getReplayRemoteWALs(remoteWALDir2, PEER_ID))); + UTIL2.getAdmin().removeReplicationPeer(PEER_ID); + verifyRemovedPeer(PEER_ID, remoteWALDir2, UTIL2); } }