HBASE-20370 Also remove the wal file in remote cluster when we finish 
replicating a file


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c3b02fbe
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c3b02fbe
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c3b02fbe

Branch: refs/heads/HBASE-19064
Commit: c3b02fbe386a240bac12236d99633fc0ca462c3d
Parents: cd151f1
Author: zhangduo <zhang...@apache.org>
Authored: Tue Apr 17 09:04:56 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Wed May 30 21:01:01 2018 +0800

----------------------------------------------------------------------
 .../hbase/replication/ReplicationUtils.java     |  36 ++++++-
 .../regionserver/ReplicationSource.java         |  38 +++----
 .../ReplicationSourceInterface.java             |  21 +++-
 .../regionserver/ReplicationSourceManager.java  | 108 ++++++++++++++-----
 .../regionserver/ReplicationSourceShipper.java  |  27 ++---
 .../hbase/wal/SyncReplicationWALProvider.java   |  11 +-
 .../replication/ReplicationSourceDummy.java     |  20 ++--
 .../TestReplicationSourceManager.java           | 101 ++++++++++++-----
 8 files changed, 246 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index cb22f57..66e9b01 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -22,14 +22,17 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Helper class for replication.
@@ -37,6 +40,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public final class ReplicationUtils {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationUtils.class);
+
   public static final String REPLICATION_ATTR_NAME = "__rep__";
 
   public static final String REMOTE_WAL_DIR_NAME = "remoteWALs";
@@ -176,4 +181,33 @@ public final class ReplicationUtils {
       return tableCFs != null && tableCFs.containsKey(tableName);
     }
   }
+
+  public static FileSystem getRemoteWALFileSystem(Configuration conf, String 
remoteWALDir)
+      throws IOException {
+    return new Path(remoteWALDir).getFileSystem(conf);
+  }
+
+  public static Path getRemoteWALDirForPeer(String remoteWALDir, String 
peerId) {
+    return new Path(remoteWALDir, peerId);
+  }
+
+  /**
+   * Do the sleeping logic
+   * @param msg Why we sleep
+   * @param sleepForRetries the base sleep time.
+   * @param sleepMultiplier by how many times the default sleeping time is 
augmented
+   * @param maxRetriesMultiplier the max retry multiplier
+   * @return True if <code>sleepMultiplier</code> is &lt; 
<code>maxRetriesMultiplier</code>
+   */
+  public static boolean sleepForRetries(String msg, long sleepForRetries, int 
sleepMultiplier,
+      int maxRetriesMultiplier) {
+    try {
+      LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, 
sleepMultiplier);
+      Thread.sleep(sleepForRetries * sleepMultiplier);
+    } catch (InterruptedException e) {
+      LOG.debug("Interrupted while sleeping between retries");
+      Thread.currentThread().interrupt();
+    }
+    return sleepMultiplier < maxRetriesMultiplier;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 4051efe..f25a232 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -89,8 +89,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
 
   protected Configuration conf;
   protected ReplicationQueueInfo replicationQueueInfo;
-  // id of the peer cluster this source replicates to
-  private String peerId;
 
   // The manager of all sources to which we ping back our progress
   protected ReplicationSourceManager manager;
@@ -168,8 +166,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
 
     this.queueId = queueId;
     this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
-    // ReplicationQueueInfo parses the peerId out of the znode for us
-    this.peerId = this.replicationQueueInfo.getPeerId();
     this.logQueueWarnThreshold = 
this.conf.getInt("replication.source.log.queue.warn", 2);
 
     defaultBandwidth = 
this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
@@ -177,8 +173,8 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     this.throttler = new ReplicationThrottler((double) currentBandwidth / 
10.0);
     this.totalBufferUsed = manager.getTotalBufferUsed();
     this.walFileLengthProvider = walFileLengthProvider;
-    LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId
-        + ", currentBandwidth=" + this.currentBandwidth);
+    LOG.info("queueId={}, ReplicationSource : {}, currentBandwidth={}", 
queueId,
+      replicationPeer.getId(), this.currentBandwidth);
   }
 
   private void decorateConf() {
@@ -215,6 +211,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   @Override
   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, 
Path>> pairs)
       throws ReplicationException {
+    String peerId = replicationPeer.getId();
     Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
     if (tableCFMap != null) {
       List<String> tableCfs = tableCFMap.get(tableName);
@@ -274,8 +271,8 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
     }
     replicationEndpoint
-        .init(new ReplicationEndpoint.Context(conf, 
replicationPeer.getConfiguration(), fs, peerId,
-            clusterId, replicationPeer, metrics, tableDescriptors, server));
+      .init(new ReplicationEndpoint.Context(conf, 
replicationPeer.getConfiguration(), fs,
+        replicationPeer.getId(), clusterId, replicationPeer, metrics, 
tableDescriptors, server));
     replicationEndpoint.start();
     replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
   }
@@ -357,8 +354,8 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     if (peerBandwidth != currentBandwidth) {
       currentBandwidth = peerBandwidth;
       throttler.setBandwidth((double) currentBandwidth / 10.0);
-      LOG.info("ReplicationSource : " + peerId
-          + " bandwidth throttling changed, currentBandWidth=" + 
currentBandwidth);
+      LOG.info("ReplicationSource : {} bandwidth throttling changed, 
currentBandWidth={}",
+        replicationPeer.getId(), currentBandwidth);
     }
   }
 
@@ -387,15 +384,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     return sleepMultiplier < maxRetriesMultiplier;
   }
 
-  /**
-   * check whether the peer is enabled or not
-   * @return true if the peer is enabled, otherwise false
-   */
-  @Override
-  public boolean isPeerEnabled() {
-    return replicationPeer.isPeerEnabled();
-  }
-
   private void initialize() {
     int sleepMultiplier = 1;
     while (this.isSourceActive()) {
@@ -529,11 +517,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   }
 
   @Override
-  public String getPeerId() {
-    return this.peerId;
-  }
-
-  @Override
   public Path getCurrentPath() {
     // only for testing
     for (ReplicationSourceShipper worker : workerThreads.values()) {
@@ -616,6 +599,11 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     return server.getServerName();
   }
 
+  @Override
+  public ReplicationPeer getPeer() {
+    return replicationPeer;
+  }
+
   Server getServer() {
     return server;
   }
@@ -623,4 +611,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   ReplicationQueueStorage getQueueStorage() {
     return queueStorage;
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 090b465..3ce5bfe 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -104,10 +104,17 @@ public interface ReplicationSourceInterface {
 
   /**
    * Get the id that the source is replicating to.
-   *
    * @return peer id
    */
-  String getPeerId();
+  default String getPeerId() {
+    return getPeer().getId();
+  }
+
+  /**
+   * Get the replication peer instance.
+   * @return the replication peer instance
+   */
+  ReplicationPeer getPeer();
 
   /**
    * Get a string representation of the current statistics
@@ -119,9 +126,17 @@ public interface ReplicationSourceInterface {
   /**
    * @return peer enabled or not
    */
-  boolean isPeerEnabled();
+  default boolean isPeerEnabled() {
+    return getPeer().isPeerEnabled();
+  }
 
   /**
+   * @return whether this is sync replication peer.
+   */
+  default boolean isSyncReplication() {
+    return getPeer().getPeerConfig().isSyncReplication();
+  }
+  /**
    * @return active or not
    */
   boolean isSourceActive();

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 4212597..cbeba23 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -85,20 +87,20 @@ import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * operations.</li>
  * <li>Need synchronized on {@link #walsById}. There are four methods which 
modify it,
  * {@link #addPeer(String)}, {@link #removePeer(String)},
- * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link 
#preLogRoll(Path)}.
+ * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and 
{@link #preLogRoll(Path)}.
  * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
  * {@link PeerProcedureHandlerImpl}. So there is no race between {@link 
#addPeer(String)} and
- * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, 
boolean, String)} is
- * called by {@link ReplicationSourceInterface}. So no race with {@link 
#addPeer(String)}.
+ * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, 
ReplicationSourceInterface)}
+ * is called by {@link ReplicationSourceInterface}. So no race with {@link 
#addPeer(String)}.
  * {@link #removePeer(String)} will terminate the {@link 
ReplicationSourceInterface} firstly, then
  * remove the wals from {@link #walsById}. So no race with {@link 
#removePeer(String)}. The only
- * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, 
boolean, String)} and
+ * case need synchronized is {@link #cleanOldLogs(String, boolean, 
ReplicationSourceInterface)} and
  * {@link #preLogRoll(Path)}.</li>
  * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are 
three methods which
  * modify it, {@link #removePeer(String)} ,
- * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
+ * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
  * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
- * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by
+ * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is 
called by
  * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will 
terminate the
  * {@link ReplicationSourceInterface} firstly, then remove the wals from
  * {@link #walsByIdRecoveredQueues}. And {@link 
ReplicationSourceManager.NodeFailoverWorker#run()}
@@ -154,9 +156,15 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
   private final boolean replicationForBulkLoadDataEnabled;
 
-
   private AtomicLong totalBufferUsed = new AtomicLong();
 
+  // How long should we sleep for each retry when deleting remote wal files 
for sync replication
+  // peer.
+  private final long sleepForRetries;
+  // Maximum number of retries before taking bold actions when deleting remote 
wal files for sync
+  // replication peer.
+  private final int maxRetriesMultiplier;
+
   /**
    * Creates a replication manager and sets the watch on all the other 
registered region servers
    * @param queueStorage the interface for manipulating replication queues
@@ -204,8 +212,11 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     tfb.setDaemon(true);
     this.executor.setThreadFactory(tfb.build());
     this.latestPaths = new HashSet<Path>();
-    replicationForBulkLoadDataEnabled = 
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
-      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+    this.replicationForBulkLoadDataEnabled = conf.getBoolean(
+      HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+    this.sleepForRetries = 
this.conf.getLong("replication.source.sync.sleepforretries", 1000);
+    this.maxRetriesMultiplier =
+      this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
   }
 
   /**
@@ -494,16 +505,15 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   /**
    * This method will log the current position to storage. And also clean old 
logs from the
    * replication queue.
-   * @param queueId id of the replication queue
-   * @param queueRecovered indicates if this queue comes from another region 
server
+   * @param source the replication source
    * @param entryBatch the wal entry batch we just shipped
    */
-  public void logPositionAndCleanOldLogs(String queueId, boolean 
queueRecovered,
+  public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
       WALEntryBatch entryBatch) {
     String fileName = entryBatch.getLastWalPath().getName();
-    abortWhenFail(() -> 
this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName,
-      entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
-    cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered);
+    abortWhenFail(() -> 
this.queueStorage.setWALPosition(server.getServerName(),
+      source.getQueueId(), fileName, entryBatch.getLastWalPosition(), 
entryBatch.getLastSeqIds()));
+    cleanOldLogs(fileName, entryBatch.isEndOfFile(), source);
   }
 
   /**
@@ -511,36 +521,84 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * file is closed and has no more entries.
    * @param log Path to the log
    * @param inclusive whether we should also remove the given log file
-   * @param queueId id of the replication queue
-   * @param queueRecovered Whether this is a recovered queue
+   * @param source the replication source
    */
   @VisibleForTesting
-  void cleanOldLogs(String log, boolean inclusive, String queueId, boolean 
queueRecovered) {
+  void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface 
source) {
     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
-    if (queueRecovered) {
-      NavigableSet<String> wals = 
walsByIdRecoveredQueues.get(queueId).get(logPrefix);
+    if (source.isRecovered()) {
+      NavigableSet<String> wals = 
walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
       if (wals != null) {
-        cleanOldLogs(wals, log, inclusive, queueId);
+        cleanOldLogs(wals, log, inclusive, source);
       }
     } else {
       // synchronized on walsById to avoid race with preLogRoll
       synchronized (this.walsById) {
-        NavigableSet<String> wals = walsById.get(queueId).get(logPrefix);
+        NavigableSet<String> wals = 
walsById.get(source.getQueueId()).get(logPrefix);
         if (wals != null) {
-          cleanOldLogs(wals, log, inclusive, queueId);
+          cleanOldLogs(wals, log, inclusive, source);
+        }
+      }
+    }
+  }
+
+  private void removeRemoteWALs(String peerId, String remoteWALDir, 
Set<String> wals)
+      throws IOException {
+    Path remoteWALDirForPeer = 
ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
+    FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, 
remoteWALDir);
+    for (String wal : wals) {
+      Path walFile = new Path(remoteWALDirForPeer, wal);
+      try {
+        if (!fs.delete(walFile, false) && fs.exists(walFile)) {
+          throw new IOException("Can not delete " + walFile);
         }
+      } catch (FileNotFoundException e) {
+        // Just ignore since this means the file has already been deleted.
+        // The javadoc of the FileSystem.delete methods does not specify the 
behavior of deleting an
+        // inexistent file, so here we deal with both, i.e, check the return 
value of the
+        // FileSystem.delete, and also catch FNFE.
+        LOG.debug("The remote wal {} has already been deleted?", walFile, e);
       }
     }
   }
 
-  private void cleanOldLogs(NavigableSet<String> wals, String key, boolean 
inclusive, String id) {
+  private void cleanOldLogs(NavigableSet<String> wals, String key, boolean 
inclusive,
+      ReplicationSourceInterface source) {
     NavigableSet<String> walSet = wals.headSet(key, inclusive);
     if (walSet.isEmpty()) {
       return;
     }
     LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
+    // The intention here is that, we want to delete the remote wal files ASAP 
as it may effect the
+    // failover time if you want to transit the remote cluster from S to A. 
And the infinite retry
+    // is not a problem, as if we can not contact with the remote HDFS 
cluster, then usually we can
+    // not contact with the HBase cluster either, so the replication will be 
blocked either.
+    if (source.isSyncReplication()) {
+      String peerId = source.getPeerId();
+      String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir();
+      LOG.debug("Removing {} logs from remote dir {} in the list: {}", 
walSet.size(), remoteWALDir,
+        walSet);
+      for (int sleepMultiplier = 0;;) {
+        try {
+          removeRemoteWALs(peerId, remoteWALDir, walSet);
+          break;
+        } catch (IOException e) {
+          LOG.warn("Failed to delete remote wals from remote dir {} for peer 
{}", remoteWALDir,
+            peerId);
+        }
+        if (!source.isSourceActive()) {
+          // skip the following operations
+          return;
+        }
+        if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", 
sleepForRetries,
+          sleepMultiplier, maxRetriesMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
+    }
+    String queueId = source.getQueueId();
     for (String wal : walSet) {
-      abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), 
id, wal));
+      abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), 
queueId, wal));
     }
     walSet.clear();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 11fd660..3f97b5e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import static 
org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -91,7 +93,7 @@ public class ReplicationSourceShipper extends Thread {
       if (!source.isPeerEnabled()) {
         // The peer enabled check is in memory, not expensive, so do not need 
to increase the
         // sleep interval as it may cause a long lag when we enable the peer.
-        sleepForRetries("Replication is disabled", 1);
+        sleepForRetries("Replication is disabled", sleepForRetries, 1, 
maxRetriesMultiplier);
         continue;
       }
       try {
@@ -189,7 +191,8 @@ public class ReplicationSourceShipper extends Thread {
       } catch (Exception ex) {
         LOG.warn("{} threw unknown exception:",
           source.getReplicationEndpoint().getClass().getName(), ex);
-        if (sleepForRetries("ReplicationEndpoint threw exception", 
sleepMultiplier)) {
+        if (sleepForRetries("ReplicationEndpoint threw exception", 
sleepForRetries, sleepMultiplier,
+          maxRetriesMultiplier)) {
           sleepMultiplier++;
         }
       }
@@ -228,8 +231,7 @@ public class ReplicationSourceShipper extends Thread {
     // position and the file will be removed soon in cleanOldLogs.
     if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
       batch.getLastWalPosition() != currentPosition) {
-      source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(),
-        source.isRecovered(), batch);
+      source.getSourceManager().logPositionAndCleanOldLogs(source, batch);
       updated = true;
     }
     // if end of file is true, then we can just skip to the next file in queue.
@@ -282,21 +284,4 @@ public class ReplicationSourceShipper extends Thread {
   public boolean isFinished() {
     return state == WorkerState.FINISHED;
   }
-
-  /**
-   * Do the sleeping logic
-   * @param msg Why we sleep
-   * @param sleepMultiplier by how many times the default sleeping time is 
augmented
-   * @return True if <code>sleepMultiplier</code> is &lt; 
<code>maxRetriesMultiplier</code>
-   */
-  public boolean sleepForRetries(String msg, int sleepMultiplier) {
-    try {
-      LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, 
sleepMultiplier);
-      Thread.sleep(this.sleepForRetries * sleepMultiplier);
-    } catch (InterruptedException e) {
-      LOG.debug("Interrupted while sleeping between retries");
-      Thread.currentThread().interrupt();
-    }
-    return sleepMultiplier < maxRetriesMultiplier;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 9cbb095..3cd356d42 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -33,11 +33,10 @@ import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
 import 
org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
@@ -118,10 +117,10 @@ public class SyncReplicationWALProvider implements 
WALProvider, PeerActionListen
   }
 
   private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws 
IOException {
-    Path remoteWALDirPath = new Path(remoteWALDir);
-    FileSystem remoteFs = remoteWALDirPath.getFileSystem(conf);
-    return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), remoteFs,
-      CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId),
+    return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf),
+      ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir),
+      CommonFSUtils.getWALRootDir(conf),
+      ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId),
       getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, 
factory.factoryId),
       conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, 
channelClass);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index ec6ec96..67f793d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -40,12 +39,13 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
  */
 public class ReplicationSourceDummy implements ReplicationSourceInterface {
 
-  ReplicationSourceManager manager;
-  String peerClusterId;
-  Path currentPath;
-  MetricsSource metrics;
-  WALFileLengthProvider walFileLengthProvider;
-  AtomicBoolean startup = new AtomicBoolean(false);
+  private ReplicationSourceManager manager;
+  private ReplicationPeer replicationPeer;
+  private String peerClusterId;
+  private Path currentPath;
+  private MetricsSource metrics;
+  private WALFileLengthProvider walFileLengthProvider;
+  private AtomicBoolean startup = new AtomicBoolean(false);
 
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
@@ -56,6 +56,7 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
     this.peerClusterId = peerClusterId;
     this.metrics = metrics;
     this.walFileLengthProvider = walFileLengthProvider;
+    this.replicationPeer = rp;
   }
 
   @Override
@@ -153,4 +154,9 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
   public ServerName getServerWALsBelongTo() {
     return null;
   }
+
+  @Override
+  public ReplicationPeer getPeer() {
+    return replicationPeer;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 482f49a..5ea3173 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -49,19 +51,19 @@ import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
@@ -71,6 +73,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
@@ -133,9 +136,9 @@ public abstract class TestReplicationSourceManager {
 
   protected static ZKWatcher zkw;
 
-  protected static HTableDescriptor htd;
+  protected static TableDescriptor htd;
 
-  protected static HRegionInfo hri;
+  protected static RegionInfo hri;
 
   protected static final byte[] r1 = Bytes.toBytes("r1");
 
@@ -156,6 +159,8 @@ public abstract class TestReplicationSourceManager {
 
   protected static Path logDir;
 
+  protected static Path remoteLogDir;
+
   protected static CountDownLatch latch;
 
   protected static List<String> files = new ArrayList<>();
@@ -185,10 +190,9 @@ public abstract class TestReplicationSourceManager {
     ZKClusterId.setClusterId(zkw, new ClusterId());
     FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
     fs = FileSystem.get(conf);
-    oldLogDir = new Path(utility.getDataTestDir(),
-        HConstants.HREGION_OLDLOGDIR_NAME);
-    logDir = new Path(utility.getDataTestDir(),
-        HConstants.HREGION_LOGDIR_NAME);
+    oldLogDir = utility.getDataTestDir(HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME);
+    remoteLogDir = 
utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME);
     replication = new Replication();
     replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
     managerOfCluster = getManagerFromCluster();
@@ -205,19 +209,16 @@ public abstract class TestReplicationSourceManager {
     }
     waitPeer(slaveId, manager, true);
 
-    htd = new HTableDescriptor(test);
-    HColumnDescriptor col = new HColumnDescriptor(f1);
-    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
-    htd.addFamily(col);
-    col = new HColumnDescriptor(f2);
-    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
-    htd.addFamily(col);
+    htd = TableDescriptorBuilder.newBuilder(test)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1)
+        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f2)).build();
 
     scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for(byte[] fam : htd.getFamiliesKeys()) {
+    for(byte[] fam : htd.getColumnFamilyNames()) {
       scopes.put(fam, 0);
     }
-    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+    hri = 
RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(r1).setEndKey(r2).build();
   }
 
   private static ReplicationSourceManager getManagerFromCluster() {
@@ -248,6 +249,7 @@ public abstract class TestReplicationSourceManager {
   private void cleanLogDir() throws IOException {
     fs.delete(logDir, true);
     fs.delete(oldLogDir, true);
+    fs.delete(remoteLogDir, true);
   }
 
   @Before
@@ -286,10 +288,10 @@ public abstract class TestReplicationSourceManager {
       .addWALActionsListener(new ReplicationSourceWALActionListener(conf, 
replicationManager));
     final WAL wal = wals.getWAL(hri);
     manager.init();
-    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame"));
-    htd.addFamily(new HColumnDescriptor(f1));
+    TableDescriptor htd = 
TableDescriptorBuilder.newBuilder(TableName.valueOf("tableame"))
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f1)).build();
     NavigableMap<byte[], Integer> scopes = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for(byte[] fam : htd.getFamiliesKeys()) {
+    for(byte[] fam : htd.getColumnFamilyNames()) {
       scopes.put(fam, 0);
     }
     // Testing normal log rolling every 20
@@ -329,7 +331,11 @@ public abstract class TestReplicationSourceManager {
 
     wal.rollWriter();
 
-    manager.logPositionAndCleanOldLogs("1", false,
+    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
+    when(source.getQueueId()).thenReturn("1");
+    when(source.isRecovered()).thenReturn(false);
+    when(source.isSyncReplication()).thenReturn(false);
+    manager.logPositionAndCleanOldLogs(source,
       new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
 
     wal.append(hri,
@@ -404,7 +410,11 @@ public abstract class TestReplicationSourceManager {
     assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
     String id = "1-" + server.getServerName().getServerName();
     assertEquals(files, 
manager.getWalsByIdRecoveredQueues().get(id).get(group));
-    manager.cleanOldLogs(file2, false, id, true);
+    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
+    when(source.getQueueId()).thenReturn(id);
+    when(source.isRecovered()).thenReturn(true);
+    when(source.isSyncReplication()).thenReturn(false);
+    manager.cleanOldLogs(file2, false, source);
     // log1 should be deleted
     assertEquals(Sets.newHashSet(file2), 
manager.getWalsByIdRecoveredQueues().get(id).get(group));
   }
@@ -488,14 +498,13 @@ public abstract class TestReplicationSourceManager {
    * corresponding ReplicationSourceInterface correctly cleans up the 
corresponding
    * replication queue and ReplicationPeer.
    * See HBASE-16096.
-   * @throws Exception
    */
   @Test
   public void testPeerRemovalCleanup() throws Exception{
     String replicationSourceImplName = 
conf.get("replication.replicationsource.implementation");
     final String peerId = "FakePeer";
-    final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
-        .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + 
":/hbase");
+    final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+      .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + 
":/hbase").build();
     try {
       DummyServer server = new DummyServer();
       ReplicationQueueStorage rq = ReplicationStorageFactory
@@ -504,7 +513,7 @@ public abstract class TestReplicationSourceManager {
       // initialization to throw an exception.
       conf.set("replication.replicationsource.implementation",
           FailInitializeDummyReplicationSource.class.getName());
-      final ReplicationPeers rp = manager.getReplicationPeers();
+      manager.getReplicationPeers();
       // Set up the znode and ReplicationPeer for the fake peer
       // Don't wait for replication source to initialize, we know it won't.
       addPeerAndWait(peerId, peerConfig, false);
@@ -549,8 +558,8 @@ public abstract class TestReplicationSourceManager {
   @Test
   public void testRemovePeerMetricsCleanup() throws Exception {
     final String peerId = "DummyPeer";
-    final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
-        .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + 
":/hbase");
+    final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+      .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + 
":/hbase").build();
     try {
       MetricsReplicationSourceSource globalSource = getGlobalSource();
       final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
@@ -582,6 +591,40 @@ public abstract class TestReplicationSourceManager {
     }
   }
 
+  @Test
+  public void testRemoveRemoteWALs() throws IOException {
+    // make sure that we can deal with files which does not exist
+    String walNameNotExists = "remoteWAL.0";
+    Path wal = new Path(logDir, walNameNotExists);
+    manager.preLogRoll(wal);
+    manager.postLogRoll(wal);
+
+    Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
+    fs.mkdirs(remoteLogDirForPeer);
+    String walName = "remoteWAL.1";
+    Path remoteWAL =
+      new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), 
fs.getWorkingDirectory());
+    fs.create(remoteWAL).close();
+    wal = new Path(logDir, walName);
+    manager.preLogRoll(wal);
+    manager.postLogRoll(wal);
+
+    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
+    when(source.getPeerId()).thenReturn(slaveId);
+    when(source.getQueueId()).thenReturn(slaveId);
+    when(source.isRecovered()).thenReturn(false);
+    when(source.isSyncReplication()).thenReturn(true);
+    ReplicationPeerConfig config = mock(ReplicationPeerConfig.class);
+    when(config.getRemoteWALDir())
+      .thenReturn(remoteLogDir.makeQualified(fs.getUri(), 
fs.getWorkingDirectory()).toString());
+    ReplicationPeer peer = mock(ReplicationPeer.class);
+    when(peer.getPeerConfig()).thenReturn(config);
+    when(source.getPeer()).thenReturn(peer);
+    manager.cleanOldLogs(walName, true, source);
+
+    assertFalse(fs.exists(remoteWAL));
+  }
+
   /**
    * Add a peer and wait for it to initialize
    * @param peerId

Reply via email to