HBASE-20434 Also remove remote wals when peer is in DA state
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d0265be2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d0265be2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d0265be2 Branch: refs/heads/HBASE-19064 Commit: d0265be28c6ef07a73f35653f139ed6ceea5d23f Parents: 74ea8e4 Author: zhangduo <zhang...@apache.org> Authored: Wed Apr 25 17:12:23 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri May 25 10:11:48 2018 +0800 ---------------------------------------------------------------------- .../hbase/replication/ReplicationUtils.java | 4 + ...ransitPeerSyncReplicationStateProcedure.java | 2 +- .../regionserver/ReplicationSource.java | 7 +- .../regionserver/ReplicationSourceManager.java | 86 ++++++++++------ .../hadoop/hbase/wal/AbstractFSWALProvider.java | 19 ++-- .../hbase/wal/SyncReplicationWALProvider.java | 30 +++++- .../TestSyncReplicationRemoveRemoteWAL.java | 101 +++++++++++++++++++ .../TestReplicationSourceManager.java | 68 ++++++++----- 8 files changed, 251 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index 66e9b01..069db7a 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -191,6 +191,10 @@ public final class ReplicationUtils { return new Path(remoteWALDir, peerId); } + public static Path getRemoteWALDirForPeer(Path remoteWALDir, String peerId) { + return new Path(remoteWALDir, peerId); + } + /** * Do the sleeping logic * @param msg Why we sleep http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index 5da2b0c..99fd615 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -211,7 +211,7 @@ public class TransitPeerSyncReplicationStateProcedure case CREATE_DIR_FOR_REMOTE_WAL: MasterFileSystem mfs = env.getMasterFileSystem(); Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); - Path remoteWALDirForPeer = new Path(remoteWALDir, peerId); + Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId); FileSystem walFs = mfs.getWALFileSystem(); try { if (walFs.exists(remoteWALDirForPeer)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index ba665b6..c669622 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -550,14 +550,17 @@ public class ReplicationSource implements ReplicationSourceInterface { } /** + * <p> * Split a path to get the start time + * </p> + * <p> * For example: 10.20.20.171%3A60020.1277499063250 + * </p> * @param p path to split * @return start time */ private static long getTS(Path p) { - int tsIndex = p.getName().lastIndexOf('.') + 1; - return Long.parseLong(p.getName().substring(tsIndex)); + return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 2d0d82b..5015129 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -542,20 +544,40 @@ public class ReplicationSourceManager implements ReplicationListener { if (source.isRecovered()) { NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix); if (wals != null) { - cleanOldLogs(wals, log, inclusive, source); + NavigableSet<String> walsToRemove = wals.headSet(log, inclusive); + if (walsToRemove.isEmpty()) { + return; + } + cleanOldLogs(walsToRemove, source); + walsToRemove.clear(); } } else { + NavigableSet<String> wals; + NavigableSet<String> walsToRemove; // synchronized on walsById to avoid race with preLogRoll synchronized (this.walsById) { - NavigableSet<String> wals = walsById.get(source.getQueueId()).get(logPrefix); - if (wals != null) { - cleanOldLogs(wals, log, inclusive, source); + wals = walsById.get(source.getQueueId()).get(logPrefix); + if (wals == null) { + return; + } + walsToRemove = wals.headSet(log, inclusive); + if (walsToRemove.isEmpty()) { + return; } + walsToRemove = new TreeSet<>(walsToRemove); + } + // cleanOldLogs may spend some time, especially for sync replication where we may want to + // remove remote wals as the remote cluster may have already been down, so we do it outside + // the lock to avoid block preLogRoll + cleanOldLogs(walsToRemove, source); + // now let's remove the files in the set + synchronized (this.walsById) { + wals.removeAll(walsToRemove); } } } - private void removeRemoteWALs(String peerId, String remoteWALDir, Set<String> wals) + private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals) throws IOException { Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId); FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); @@ -575,13 +597,8 @@ public class ReplicationSourceManager implements ReplicationListener { } } - private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, - ReplicationSourceInterface source) { - NavigableSet<String> walSet = wals.headSet(key, inclusive); - if (walSet.isEmpty()) { - return; - } - LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet); + private void cleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface source) { + LOG.debug("Removing {} logs in the list: {}", wals.size(), wals); // The intention here is that, we want to delete the remote wal files ASAP as it may effect the // failover time if you want to transit the remote cluster from S to A. And the infinite retry // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can @@ -589,31 +606,38 @@ public class ReplicationSourceManager implements ReplicationListener { if (source.isSyncReplication()) { String peerId = source.getPeerId(); String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir(); - LOG.debug("Removing {} logs from remote dir {} in the list: {}", walSet.size(), remoteWALDir, - walSet); - for (int sleepMultiplier = 0;;) { - try { - removeRemoteWALs(peerId, remoteWALDir, walSet); - break; - } catch (IOException e) { - LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir, - peerId); - } - if (!source.isSourceActive()) { - // skip the following operations - return; - } - if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries, - sleepMultiplier, maxRetriesMultiplier)) { - sleepMultiplier++; + // Filter out the wals need to be removed from the remote directory. Its name should be the + // special format, and also, the peer id in its name should match the peer id for the + // replication source. + List<String> remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider + .getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false)) + .collect(Collectors.toList()); + LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(), + remoteWALDir, remoteWals); + if (!remoteWals.isEmpty()) { + for (int sleepMultiplier = 0;;) { + try { + removeRemoteWALs(peerId, remoteWALDir, remoteWals); + break; + } catch (IOException e) { + LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir, + peerId); + } + if (!source.isSourceActive()) { + // skip the following operations + return; + } + if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries, + sleepMultiplier, maxRetriesMultiplier)) { + sleepMultiplier++; + } } } } String queueId = source.getQueueId(); - for (String wal : walSet) { + for (String wal : wals) { abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), queueId, wal)); } - walSet.clear(); } // public because of we call it in TestReplicationEmptyWALRecovery http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index e528624..ccdc95f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -517,6 +517,14 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen listeners.add(listener); } + private static String getWALNameGroupFromWALName(String name, int group) { + Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name); + if (matcher.matches()) { + return matcher.group(group); + } else { + throw new IllegalArgumentException(name + " is not a valid wal file name"); + } + } /** * Get prefix of the log from its name, assuming WAL name in format of * log_prefix.filenumber.log_suffix @@ -526,11 +534,10 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen * @see AbstractFSWAL#getCurrentFileName() */ public static String getWALPrefixFromWALName(String name) { - Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name); - if (matcher.matches()) { - return matcher.group(1); - } else { - throw new IllegalArgumentException(name + " is not a valid wal file name"); - } + return getWALNameGroupFromWALName(name, 1); + } + + public static long getWALStartTimeFromWALName(String name) { + return Long.parseLong(getWALNameGroupFromWALName(name, 2)); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 8faccd7..8e82d8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.function.BiPredicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; @@ -48,6 +50,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Streams; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; @@ -64,7 +67,8 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class); - private static final String LOG_SUFFIX = ".syncrep"; + @VisibleForTesting + public static final String LOG_SUFFIX = ".syncrep"; private final WALProvider provider; @@ -288,4 +292,28 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen return false; } } + + private static final Pattern LOG_PREFIX_PATTERN = Pattern.compile(".*-\\d+-(.+)"); + + /** + * <p> + * Returns the peer id if the wal file name is in the special group for a sync replication peer. + * </p> + * <p> + * The prefix format is <factoryId>-<ts>-<peerId>. + * </p> + */ + public static Optional<String> getSyncReplicationPeerIdFromWALName(String name) { + if (!name.endsWith(LOG_SUFFIX)) { + // fast path to return earlier if the name is not for a sync replication peer. + return Optional.empty(); + } + String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); + Matcher matcher = LOG_PREFIX_PATTERN.matcher(logPrefix); + if (matcher.matches()) { + return Optional.of(matcher.group(1)); + } else { + return Optional.empty(); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java new file mode 100644 index 0000000..7d380c1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.hamcrest.CoreMatchers.endsWith; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationRemoveRemoteWAL.class); + + private void waitUntilDeleted(Path remoteWAL) throws Exception { + MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem(); + UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() { + + @Override + public boolean evaluate() throws Exception { + return !mfs.getWALFileSystem().exists(remoteWAL); + } + + @Override + public String explainFailure() throws Exception { + return remoteWAL + " has not been deleted yet"; + } + }); + } + + @Test + public void testRemoveRemoteWAL() throws Exception { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + + MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem(); + Path remoteWALDir = ReplicationUtils.getRemoteWALDirForPeer( + new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID); + FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir); + assertEquals(1, remoteWALStatus.length); + Path remoteWAL = remoteWALStatus[0].getPath(); + assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX)); + writeAndVerifyReplication(UTIL1, UTIL2, 0, 100); + + HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME); + rs.getWalRoller().requestRollAll(); + // The replicated wal file should be deleted finally + waitUntilDeleted(remoteWAL); + remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir); + assertEquals(1, remoteWALStatus.length); + remoteWAL = remoteWALStatus[0].getPath(); + assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX)); + + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + write(UTIL1, 100, 200); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + + // should still be there since the peer is disabled and we haven't replicated the data yet + assertTrue(mfs.getWALFileSystem().exists(remoteWAL)); + + UTIL1.getAdmin().enableReplicationPeer(PEER_ID); + waitUntilReplicationDone(UTIL2, 200); + verifyThroughRegion(UTIL2, 100, 200); + + // Confirm that we will also remove the remote wal files in DA state + waitUntilDeleted(remoteWAL); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index cff8ceb..d98b7f85 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; @@ -592,27 +593,10 @@ public abstract class TestReplicationSourceManager { } } - @Test - public void testRemoveRemoteWALs() throws IOException { - // make sure that we can deal with files which does not exist - String walNameNotExists = "remoteWAL.0"; - Path wal = new Path(logDir, walNameNotExists); - manager.preLogRoll(wal); - manager.postLogRoll(wal); - - Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId); - fs.mkdirs(remoteLogDirForPeer); - String walName = "remoteWAL.1"; - Path remoteWAL = - new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); - fs.create(remoteWAL).close(); - wal = new Path(logDir, walName); - manager.preLogRoll(wal); - manager.postLogRoll(wal); - + private ReplicationSourceInterface mockReplicationSource(String peerId) { ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); - when(source.getPeerId()).thenReturn(slaveId); - when(source.getQueueId()).thenReturn(slaveId); + when(source.getPeerId()).thenReturn(peerId); + when(source.getQueueId()).thenReturn(peerId); when(source.isRecovered()).thenReturn(false); when(source.isSyncReplication()).thenReturn(true); ReplicationPeerConfig config = mock(ReplicationPeerConfig.class); @@ -621,17 +605,51 @@ public abstract class TestReplicationSourceManager { ReplicationPeer peer = mock(ReplicationPeer.class); when(peer.getPeerConfig()).thenReturn(config); when(source.getPeer()).thenReturn(peer); - manager.cleanOldLogs(walName, true, source); + return source; + } - assertFalse(fs.exists(remoteWAL)); + @Test + public void testRemoveRemoteWALs() throws Exception { + String peerId2 = slaveId + "_2"; + addPeerAndWait(peerId2, + ReplicationPeerConfig.newBuilder() + .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build(), + true); + try { + // make sure that we can deal with files which does not exist + String walNameNotExists = + "remoteWAL-12345-" + slaveId + ".12345" + SyncReplicationWALProvider.LOG_SUFFIX; + Path wal = new Path(logDir, walNameNotExists); + manager.preLogRoll(wal); + manager.postLogRoll(wal); + + Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId); + fs.mkdirs(remoteLogDirForPeer); + String walName = + "remoteWAL-12345-" + slaveId + ".23456" + SyncReplicationWALProvider.LOG_SUFFIX; + Path remoteWAL = + new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); + fs.create(remoteWAL).close(); + wal = new Path(logDir, walName); + manager.preLogRoll(wal); + manager.postLogRoll(wal); + + ReplicationSourceInterface source = mockReplicationSource(peerId2); + manager.cleanOldLogs(walName, true, source); + // still there if peer id does not match + assertTrue(fs.exists(remoteWAL)); + + source = mockReplicationSource(slaveId); + manager.cleanOldLogs(walName, true, source); + assertFalse(fs.exists(remoteWAL)); + } finally { + removePeerAndWait(peerId2); + } } /** * Add a peer and wait for it to initialize - * @param peerId - * @param peerConfig * @param waitForSource Whether to wait for replication source to initialize - * @throws Exception */ private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, final boolean waitForSource) throws Exception {