HBASE-20434 Also remove remote wals when peer is in DA state

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d0265be2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d0265be2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d0265be2

Branch: refs/heads/HBASE-19064
Commit: d0265be28c6ef07a73f35653f139ed6ceea5d23f
Parents: 74ea8e4
Author: zhangduo <zhang...@apache.org>
Authored: Wed Apr 25 17:12:23 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri May 25 10:11:48 2018 +0800

----------------------------------------------------------------------
 .../hbase/replication/ReplicationUtils.java     |   4 +
 ...ransitPeerSyncReplicationStateProcedure.java |   2 +-
 .../regionserver/ReplicationSource.java         |   7 +-
 .../regionserver/ReplicationSourceManager.java  |  86 ++++++++++------
 .../hadoop/hbase/wal/AbstractFSWALProvider.java |  19 ++--
 .../hbase/wal/SyncReplicationWALProvider.java   |  30 +++++-
 .../TestSyncReplicationRemoveRemoteWAL.java     | 101 +++++++++++++++++++
 .../TestReplicationSourceManager.java           |  68 ++++++++-----
 8 files changed, 251 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 66e9b01..069db7a 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -191,6 +191,10 @@ public final class ReplicationUtils {
     return new Path(remoteWALDir, peerId);
   }
 
+  public static Path getRemoteWALDirForPeer(Path remoteWALDir, String peerId) {
+    return new Path(remoteWALDir, peerId);
+  }
+
   /**
    * Do the sleeping logic
    * @param msg Why we sleep

http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 5da2b0c..99fd615 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -211,7 +211,7 @@ public class TransitPeerSyncReplicationStateProcedure
       case CREATE_DIR_FOR_REMOTE_WAL:
         MasterFileSystem mfs = env.getMasterFileSystem();
         Path remoteWALDir = new Path(mfs.getWALRootDir(), 
ReplicationUtils.REMOTE_WAL_DIR_NAME);
-        Path remoteWALDirForPeer = new Path(remoteWALDir, peerId);
+        Path remoteWALDirForPeer = 
ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
         FileSystem walFs = mfs.getWALFileSystem();
         try {
           if (walFs.exists(remoteWALDirForPeer)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index ba665b6..c669622 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -550,14 +550,17 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     }
 
     /**
+     * <p>
      * Split a path to get the start time
+     * </p>
+     * <p>
      * For example: 10.20.20.171%3A60020.1277499063250
+     * </p>
      * @param p path to split
      * @return start time
      */
     private static long getTS(Path p) {
-      int tsIndex = p.getName().lastIndexOf('.') + 1;
-      return Long.parseLong(p.getName().substring(tsIndex));
+      return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 2d0d82b..5015129 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -542,20 +544,40 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     if (source.isRecovered()) {
       NavigableSet<String> wals = 
walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
       if (wals != null) {
-        cleanOldLogs(wals, log, inclusive, source);
+        NavigableSet<String> walsToRemove = wals.headSet(log, inclusive);
+        if (walsToRemove.isEmpty()) {
+          return;
+        }
+        cleanOldLogs(walsToRemove, source);
+        walsToRemove.clear();
       }
     } else {
+      NavigableSet<String> wals;
+      NavigableSet<String> walsToRemove;
       // synchronized on walsById to avoid race with preLogRoll
       synchronized (this.walsById) {
-        NavigableSet<String> wals = 
walsById.get(source.getQueueId()).get(logPrefix);
-        if (wals != null) {
-          cleanOldLogs(wals, log, inclusive, source);
+        wals = walsById.get(source.getQueueId()).get(logPrefix);
+        if (wals == null) {
+          return;
+        }
+        walsToRemove = wals.headSet(log, inclusive);
+        if (walsToRemove.isEmpty()) {
+          return;
         }
+        walsToRemove = new TreeSet<>(walsToRemove);
+      }
+      // cleanOldLogs may spend some time, especially for sync replication 
where we may want to
+      // remove remote wals as the remote cluster may have already been down, 
so we do it outside
+      // the lock to avoid block preLogRoll
+      cleanOldLogs(walsToRemove, source);
+      // now let's remove the files in the set
+      synchronized (this.walsById) {
+        wals.removeAll(walsToRemove);
       }
     }
   }
 
-  private void removeRemoteWALs(String peerId, String remoteWALDir, 
Set<String> wals)
+  private void removeRemoteWALs(String peerId, String remoteWALDir, 
Collection<String> wals)
       throws IOException {
     Path remoteWALDirForPeer = 
ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
     FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, 
remoteWALDir);
@@ -575,13 +597,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     }
   }
 
-  private void cleanOldLogs(NavigableSet<String> wals, String key, boolean 
inclusive,
-      ReplicationSourceInterface source) {
-    NavigableSet<String> walSet = wals.headSet(key, inclusive);
-    if (walSet.isEmpty()) {
-      return;
-    }
-    LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
+  private void cleanOldLogs(NavigableSet<String> wals, 
ReplicationSourceInterface source) {
+    LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
     // The intention here is that, we want to delete the remote wal files ASAP 
as it may effect the
     // failover time if you want to transit the remote cluster from S to A. 
And the infinite retry
     // is not a problem, as if we can not contact with the remote HDFS 
cluster, then usually we can
@@ -589,31 +606,38 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     if (source.isSyncReplication()) {
       String peerId = source.getPeerId();
       String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir();
-      LOG.debug("Removing {} logs from remote dir {} in the list: {}", 
walSet.size(), remoteWALDir,
-        walSet);
-      for (int sleepMultiplier = 0;;) {
-        try {
-          removeRemoteWALs(peerId, remoteWALDir, walSet);
-          break;
-        } catch (IOException e) {
-          LOG.warn("Failed to delete remote wals from remote dir {} for peer 
{}", remoteWALDir,
-            peerId);
-        }
-        if (!source.isSourceActive()) {
-          // skip the following operations
-          return;
-        }
-        if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", 
sleepForRetries,
-          sleepMultiplier, maxRetriesMultiplier)) {
-          sleepMultiplier++;
+      // Filter out the wals need to be removed from the remote directory. Its 
name should be the
+      // special format, and also, the peer id in its name should match the 
peer id for the
+      // replication source.
+      List<String> remoteWals = wals.stream().filter(w -> 
SyncReplicationWALProvider
+        
.getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false))
+        .collect(Collectors.toList());
+      LOG.debug("Removing {} logs from remote dir {} in the list: {}", 
remoteWals.size(),
+        remoteWALDir, remoteWals);
+      if (!remoteWals.isEmpty()) {
+        for (int sleepMultiplier = 0;;) {
+          try {
+            removeRemoteWALs(peerId, remoteWALDir, remoteWals);
+            break;
+          } catch (IOException e) {
+            LOG.warn("Failed to delete remote wals from remote dir {} for peer 
{}", remoteWALDir,
+              peerId);
+          }
+          if (!source.isSourceActive()) {
+            // skip the following operations
+            return;
+          }
+          if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", 
sleepForRetries,
+            sleepMultiplier, maxRetriesMultiplier)) {
+            sleepMultiplier++;
+          }
         }
       }
     }
     String queueId = source.getQueueId();
-    for (String wal : walSet) {
+    for (String wal : wals) {
       abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), 
queueId, wal));
     }
-    walSet.clear();
   }
 
   // public because of we call it in TestReplicationEmptyWALRecovery

http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index e528624..ccdc95f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -517,6 +517,14 @@ public abstract class AbstractFSWALProvider<T extends 
AbstractFSWAL<?>> implemen
     listeners.add(listener);
   }
 
+  private static String getWALNameGroupFromWALName(String name, int group) {
+    Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name);
+    if (matcher.matches()) {
+      return matcher.group(group);
+    } else {
+      throw new IllegalArgumentException(name + " is not a valid wal file 
name");
+    }
+  }
   /**
    * Get prefix of the log from its name, assuming WAL name in format of
    * log_prefix.filenumber.log_suffix
@@ -526,11 +534,10 @@ public abstract class AbstractFSWALProvider<T extends 
AbstractFSWAL<?>> implemen
    * @see AbstractFSWAL#getCurrentFileName()
    */
   public static String getWALPrefixFromWALName(String name) {
-    Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name);
-    if (matcher.matches()) {
-      return matcher.group(1);
-    } else {
-      throw new IllegalArgumentException(name + " is not a valid wal file 
name");
-    }
+    return getWALNameGroupFromWALName(name, 1);
+  }
+
+  public static long getWALStartTimeFromWALName(String name) {
+    return Long.parseLong(getWALNameGroupFromWALName(name, 2));
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 8faccd7..8e82d8b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
@@ -48,6 +50,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@@ -64,7 +67,8 @@ public class SyncReplicationWALProvider implements 
WALProvider, PeerActionListen
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SyncReplicationWALProvider.class);
 
-  private static final String LOG_SUFFIX = ".syncrep";
+  @VisibleForTesting
+  public static final String LOG_SUFFIX = ".syncrep";
 
   private final WALProvider provider;
 
@@ -288,4 +292,28 @@ public class SyncReplicationWALProvider implements 
WALProvider, PeerActionListen
       return false;
     }
   }
+
+  private static final Pattern LOG_PREFIX_PATTERN = 
Pattern.compile(".*-\\d+-(.+)");
+
+  /**
+   * <p>
+   * Returns the peer id if the wal file name is in the special group for a 
sync replication peer.
+   * </p>
+   * <p>
+   * The prefix format is &lt;factoryId&gt;-&lt;ts&gt;-&lt;peerId&gt;.
+   * </p>
+   */
+  public static Optional<String> getSyncReplicationPeerIdFromWALName(String 
name) {
+    if (!name.endsWith(LOG_SUFFIX)) {
+      // fast path to return earlier if the name is not for a sync replication 
peer.
+      return Optional.empty();
+    }
+    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
+    Matcher matcher = LOG_PREFIX_PATTERN.matcher(logPrefix);
+    if (matcher.matches()) {
+      return Optional.of(matcher.group(1));
+    } else {
+      return Optional.empty();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
new file mode 100644
index 0000000..7d380c1
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.CoreMatchers.endsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationRemoveRemoteWAL extends 
SyncReplicationTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestSyncReplicationRemoveRemoteWAL.class);
+
+  private void waitUntilDeleted(Path remoteWAL) throws Exception {
+    MasterFileSystem mfs = 
UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
+    UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return !mfs.getWALFileSystem().exists(remoteWAL);
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return remoteWAL + " has not been deleted yet";
+      }
+    });
+  }
+
+  @Test
+  public void testRemoveRemoteWAL() throws Exception {
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.ACTIVE);
+
+    MasterFileSystem mfs = 
UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
+    Path remoteWALDir = ReplicationUtils.getRemoteWALDirForPeer(
+      new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), 
PEER_ID);
+    FileStatus[] remoteWALStatus = 
mfs.getWALFileSystem().listStatus(remoteWALDir);
+    assertEquals(1, remoteWALStatus.length);
+    Path remoteWAL = remoteWALStatus[0].getPath();
+    assertThat(remoteWAL.getName(), 
endsWith(SyncReplicationWALProvider.LOG_SUFFIX));
+    writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
+
+    HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
+    rs.getWalRoller().requestRollAll();
+    // The replicated wal file should be deleted finally
+    waitUntilDeleted(remoteWAL);
+    remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
+    assertEquals(1, remoteWALStatus.length);
+    remoteWAL = remoteWALStatus[0].getPath();
+    assertThat(remoteWAL.getName(), 
endsWith(SyncReplicationWALProvider.LOG_SUFFIX));
+
+    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
+    write(UTIL1, 100, 200);
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.DOWNGRADE_ACTIVE);
+
+    // should still be there since the peer is disabled and we haven't 
replicated the data yet
+    assertTrue(mfs.getWALFileSystem().exists(remoteWAL));
+
+    UTIL1.getAdmin().enableReplicationPeer(PEER_ID);
+    waitUntilReplicationDone(UTIL2, 200);
+    verifyThroughRegion(UTIL2, 100, 200);
+
+    // Confirm that we will also remove the remote wal files in DA state
+    waitUntilDeleted(remoteWAL);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d0265be2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index cff8ceb..d98b7f85 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -592,27 +593,10 @@ public abstract class TestReplicationSourceManager {
     }
   }
 
-  @Test
-  public void testRemoveRemoteWALs() throws IOException {
-    // make sure that we can deal with files which does not exist
-    String walNameNotExists = "remoteWAL.0";
-    Path wal = new Path(logDir, walNameNotExists);
-    manager.preLogRoll(wal);
-    manager.postLogRoll(wal);
-
-    Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
-    fs.mkdirs(remoteLogDirForPeer);
-    String walName = "remoteWAL.1";
-    Path remoteWAL =
-      new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), 
fs.getWorkingDirectory());
-    fs.create(remoteWAL).close();
-    wal = new Path(logDir, walName);
-    manager.preLogRoll(wal);
-    manager.postLogRoll(wal);
-
+  private ReplicationSourceInterface mockReplicationSource(String peerId) {
     ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
-    when(source.getPeerId()).thenReturn(slaveId);
-    when(source.getQueueId()).thenReturn(slaveId);
+    when(source.getPeerId()).thenReturn(peerId);
+    when(source.getQueueId()).thenReturn(peerId);
     when(source.isRecovered()).thenReturn(false);
     when(source.isSyncReplication()).thenReturn(true);
     ReplicationPeerConfig config = mock(ReplicationPeerConfig.class);
@@ -621,17 +605,51 @@ public abstract class TestReplicationSourceManager {
     ReplicationPeer peer = mock(ReplicationPeer.class);
     when(peer.getPeerConfig()).thenReturn(config);
     when(source.getPeer()).thenReturn(peer);
-    manager.cleanOldLogs(walName, true, source);
+    return source;
+  }
 
-    assertFalse(fs.exists(remoteWAL));
+  @Test
+  public void testRemoveRemoteWALs() throws Exception {
+    String peerId2 = slaveId + "_2";
+    addPeerAndWait(peerId2,
+      ReplicationPeerConfig.newBuilder()
+        .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + 
":/hbase").build(),
+      true);
+    try {
+      // make sure that we can deal with files which does not exist
+      String walNameNotExists =
+        "remoteWAL-12345-" + slaveId + ".12345" + 
SyncReplicationWALProvider.LOG_SUFFIX;
+      Path wal = new Path(logDir, walNameNotExists);
+      manager.preLogRoll(wal);
+      manager.postLogRoll(wal);
+
+      Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
+      fs.mkdirs(remoteLogDirForPeer);
+      String walName =
+        "remoteWAL-12345-" + slaveId + ".23456" + 
SyncReplicationWALProvider.LOG_SUFFIX;
+      Path remoteWAL =
+        new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), 
fs.getWorkingDirectory());
+      fs.create(remoteWAL).close();
+      wal = new Path(logDir, walName);
+      manager.preLogRoll(wal);
+      manager.postLogRoll(wal);
+
+      ReplicationSourceInterface source = mockReplicationSource(peerId2);
+      manager.cleanOldLogs(walName, true, source);
+      // still there if peer id does not match
+      assertTrue(fs.exists(remoteWAL));
+
+      source = mockReplicationSource(slaveId);
+      manager.cleanOldLogs(walName, true, source);
+      assertFalse(fs.exists(remoteWAL));
+    } finally {
+      removePeerAndWait(peerId2);
+    }
   }
 
   /**
    * Add a peer and wait for it to initialize
-   * @param peerId
-   * @param peerConfig
    * @param waitForSource Whether to wait for replication source to initialize
-   * @throws Exception
    */
   private void addPeerAndWait(final String peerId, final ReplicationPeerConfig 
peerConfig,
       final boolean waitForSource) throws Exception {

Reply via email to