HBASE-20456 Support removing a ReplicationSourceShipper for a special wal group


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/74ea8e49
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/74ea8e49
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/74ea8e49

Branch: refs/heads/HBASE-19064
Commit: 74ea8e497332cae028e59bd6acd171ccc1fc87a9
Parents: d3794dd
Author: zhangduo <zhang...@apache.org>
Authored: Tue Apr 24 22:01:21 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri May 25 10:11:48 2018 +0800

----------------------------------------------------------------------
 .../hbase/regionserver/wal/AsyncFSWAL.java      |  1 +
 .../RecoveredReplicationSource.java             | 13 +---
 .../RecoveredReplicationSourceShipper.java      |  7 --
 .../regionserver/ReplicationSource.java         | 13 +++-
 .../regionserver/ReplicationSourceManager.java  | 19 ++++-
 .../regionserver/ReplicationSourceShipper.java  | 20 +++--
 .../ReplicationSourceWALReader.java             |  9 ++-
 .../regionserver/WALEntryStream.java            |  3 +-
 .../hadoop/hbase/wal/AbstractFSWALProvider.java | 28 ++++---
 .../hbase/wal/SyncReplicationWALProvider.java   | 10 ++-
 .../TestReplicationSourceManager.java           |  5 +-
 .../TestSyncReplicationShipperQuit.java         | 81 ++++++++++++++++++++
 .../regionserver/TestWALEntryStream.java        |  4 +-
 13 files changed, 163 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/74ea8e49/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index d98ab75..9b4ce9c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -682,6 +682,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
   protected void doShutdown() throws IOException {
     waitForSafePoint();
     closeWriter(this.writer);
+    this.writer = null;
     closeExecutor.shutdown();
     try {
       if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, 
TimeUnit.SECONDS)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ea8e49/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index a21ca44..f1bb538 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -144,15 +143,9 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
   }
 
   void tryFinish() {
-    // use synchronize to make sure one last thread will clean the queue
-    synchronized (workerThreads) {
-      Threads.sleep(100);// wait a short while for other worker thread to 
fully exit
-      boolean allTasksDone = workerThreads.values().stream().allMatch(w -> 
w.isFinished());
-      if (allTasksDone) {
-        this.getSourceMetrics().clear();
-        manager.removeRecoveredSource(this);
-        LOG.info("Finished recovering queue {} with the following stats: {}", 
queueId, getStats());
-      }
+    if (workerThreads.isEmpty()) {
+      this.getSourceMetrics().clear();
+      manager.finishRecoveredSource(this);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ea8e49/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index 91109cf..b0d4db0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -48,13 +48,6 @@ public class RecoveredReplicationSourceShipper extends 
ReplicationSourceShipper
   }
 
   @Override
-  protected void noMoreData() {
-    LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, 
source.getQueueId());
-    source.getSourceMetrics().incrCompletedRecoveryQueue();
-    setWorkerState(WorkerState.FINISHED);
-  }
-
-  @Override
   protected void postFinish() {
     source.tryFinish();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ea8e49/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index f25a232..ba665b6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -62,6 +62,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 /**
@@ -120,6 +121,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   private long defaultBandwidth;
   private long currentBandwidth;
   private WALFileLengthProvider walFileLengthProvider;
+  @VisibleForTesting
   protected final ConcurrentHashMap<String, ReplicationSourceShipper> 
workerThreads =
       new ConcurrentHashMap<>();
 
@@ -190,6 +192,9 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
     if (queue == null) {
       queue = new PriorityBlockingQueue<>(queueSizePerGroup, new 
LogsComparator());
+      // make sure that we do not use an empty queue when setting up a 
ReplicationSource, otherwise
+      // the shipper may quit immediately
+      queue.put(log);
       queues.put(logPrefix, queue);
       if (this.isSourceActive() && this.walEntryFilter != null) {
         // new wal group observed after source startup, start a new worker 
thread to track it
@@ -197,8 +202,10 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
         // still not launched, so it's necessary to check workerThreads before 
start the worker
         tryStartNewShipper(logPrefix, queue);
       }
+    } else {
+      queue.put(log);
     }
-    queue.put(log);
+
     this.metrics.incrSizeOfLogQueue();
     // This will log a warning for each new log that gets created above the 
warn threshold
     int queueSize = queue.size();
@@ -612,5 +619,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     return queueStorage;
   }
 
-
+  void removeWorker(ReplicationSourceShipper worker) {
+    workerThreads.remove(worker.walGroupId, worker);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ea8e49/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index cbeba23..2d0d82b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -443,12 +443,25 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * Clear the metrics and related replication queue of the specified old 
source
    * @param src source to clear
    */
-  void removeRecoveredSource(ReplicationSourceInterface src) {
-    LOG.info("Done with the recovered queue " + src.getQueueId());
-    this.oldsources.remove(src);
+  private boolean removeRecoveredSource(ReplicationSourceInterface src) {
+    if (!this.oldsources.remove(src)) {
+      return false;
+    }
+    LOG.info("Done with the recovered queue {}", src.getQueueId());
     // Delete queue from storage and memory
     deleteQueue(src.getQueueId());
     this.walsByIdRecoveredQueues.remove(src.getQueueId());
+    return true;
+  }
+
+  void finishRecoveredSource(ReplicationSourceInterface src) {
+    synchronized (oldsources) {
+      if (!removeRecoveredSource(src)) {
+        return;
+      }
+    }
+    LOG.info("Finished recovering queue {} with the following stats: {}", 
src.getQueueId(),
+      src.getStats());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ea8e49/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 3f97b5e..b1361fd 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -50,13 +50,13 @@ public class ReplicationSourceShipper extends Thread {
   public enum WorkerState {
     RUNNING,
     STOPPED,
-    FINISHED,  // The worker is done processing a recovered queue
+    FINISHED,  // The worker is done processing a queue
   }
 
   private final Configuration conf;
   protected final String walGroupId;
   protected final PriorityBlockingQueue<Path> queue;
-  private final ReplicationSourceInterface source;
+  private final ReplicationSource source;
 
   // Last position in the log that we sent to ZooKeeper
   // It will be accessed by the stats thread so make it volatile
@@ -73,7 +73,7 @@ public class ReplicationSourceShipper extends Thread {
   protected final int maxRetriesMultiplier;
 
   public ReplicationSourceShipper(Configuration conf, String walGroupId,
-      PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
+      PriorityBlockingQueue<Path> queue, ReplicationSource source) {
     this.conf = conf;
     this.walGroupId = walGroupId;
     this.queue = queue;
@@ -98,7 +98,7 @@ public class ReplicationSourceShipper extends Thread {
       }
       try {
         WALEntryBatch entryBatch = entryReader.take();
-        // the NO_MORE_DATA instance has no path so do not all shipEdits
+        // the NO_MORE_DATA instance has no path so do not call shipEdits
         if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
           noMoreData();
         } else {
@@ -113,12 +113,20 @@ public class ReplicationSourceShipper extends Thread {
     if (!isFinished()) {
       setWorkerState(WorkerState.STOPPED);
     } else {
+      source.removeWorker(this);
       postFinish();
     }
   }
 
-  // To be implemented by recovered shipper
-  protected void noMoreData() {
+  private void noMoreData() {
+    if (source.isRecovered()) {
+      LOG.debug("Finished recovering queue for group {} of peer {}", 
walGroupId,
+        source.getQueueId());
+      source.getSourceMetrics().incrCompletedRecoveryQueue();
+    } else {
+      LOG.debug("Finished queue for group {} of peer {}", walGroupId, 
source.getQueueId());
+    }
+    setWorkerState(WorkerState.FINISHED);
   }
 
   // To be implemented by recovered shipper

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ea8e49/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 64fd48d..61ab7c2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -142,7 +142,7 @@ class ReplicationSourceWALReader extends Thread {
             entryBatchQueue.put(batch);
             sleepMultiplier = 1;
           } else { // got no entries and didn't advance position in WAL
-            handleEmptyWALEntryBatch(entryStream.getCurrentPath());
+            handleEmptyWALEntryBatch();
             entryStream.reset(); // reuse stream
           }
         }
@@ -224,10 +224,11 @@ class ReplicationSourceWALReader extends Thread {
     return batch;
   }
 
-  private void handleEmptyWALEntryBatch(Path currentPath) throws 
InterruptedException {
+  private void handleEmptyWALEntryBatch() throws InterruptedException {
     LOG.trace("Didn't read any new entries from WAL");
-    if (source.isRecovered()) {
-      // we're done with queue recovery, shut ourself down
+    if (logQueue.isEmpty()) {
+      // we're done with current queue, either this is a recovered queue, or 
it is the special group
+      // for a sync replication peer and the peer has been transited to DA or 
S state.
       setReaderRunning(false);
       // shuts down shipper thread immediately
       entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ea8e49/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index b2c199e..0393af4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -304,7 +304,8 @@ class WALEntryStream implements Closeable {
         return true;
       }
     } else {
-      // no more files in queue, this could only happen for recovered queue.
+      // no more files in queue, this could happen for recovered queue, or for 
a wal group of a sync
+      // replication peer which has already been transited to DA or S.
       setCurrentPath(null);
     }
     return false;

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ea8e49/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 5a3fba3..e528624 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
@@ -247,26 +248,30 @@ public abstract class AbstractFSWALProvider<T extends 
AbstractFSWAL<?>> implemen
     if (walName == null) {
       throw new IllegalArgumentException("The WAL path couldn't be null");
     }
-    final String[] walPathStrs = walName.toString().split("\\" + 
WAL_FILE_NAME_DELIMITER);
-    return Long.parseLong(walPathStrs[walPathStrs.length - 
(isMetaFile(walName) ? 2 : 1)]);
+    Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(walName.getName());
+    if (matcher.matches()) {
+      return Long.parseLong(matcher.group(2));
+    } else {
+      throw new IllegalArgumentException(walName.getName() + " is not a valid 
wal file name");
+    }
   }
 
   /**
    * Pattern used to validate a WAL file name see {@link 
#validateWALFilename(String)} for
    * description.
    */
-  private static final Pattern pattern =
-    Pattern.compile(".*\\.\\d*(" + META_WAL_PROVIDER_ID + ")*");
+  private static final Pattern WAL_FILE_NAME_PATTERN =
+    Pattern.compile("(.+)\\.(\\d+)(\\.[0-9A-Za-z]+)?");
 
   /**
    * A WAL file name is of the format: &lt;wal-name&gt;{@link 
#WAL_FILE_NAME_DELIMITER}
-   * &lt;file-creation-timestamp&gt;[.meta]. provider-name is usually made up 
of a server-name and a
-   * provider-id
+   * &lt;file-creation-timestamp&gt;[.&lt;suffix&gt;]. provider-name is 
usually made up of a
+   * server-name and a provider-id
    * @param filename name of the file to validate
    * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> 
otherwise
    */
   public static boolean validateWALFilename(String filename) {
-    return pattern.matcher(filename).matches();
+    return WAL_FILE_NAME_PATTERN.matcher(filename).matches();
   }
 
   /**
@@ -517,10 +522,15 @@ public abstract class AbstractFSWALProvider<T extends 
AbstractFSWAL<?>> implemen
    * log_prefix.filenumber.log_suffix
    * @param name Name of the WAL to parse
    * @return prefix of the log
+   * @throws IllegalArgumentException if the name passed in is not a valid wal 
file name
    * @see AbstractFSWAL#getCurrentFileName()
    */
   public static String getWALPrefixFromWALName(String name) {
-    int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
-    return name.substring(0, endIndex);
+    Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name);
+    if (matcher.matches()) {
+      return matcher.group(1);
+    } else {
+      throw new IllegalArgumentException(name + " is not a valid wal file 
name");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ea8e49/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 3b56aa2..8faccd7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.wal;
 
-import static 
org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
 import static 
org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDirectoryName;
 import static 
org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName;
 
@@ -42,6 +41,7 @@ import 
org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
 import 
org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -113,8 +113,12 @@ public class SyncReplicationWALProvider implements 
WALProvider, PeerActionListen
     channelClass = eventLoopGroupAndChannelClass.getSecond();
   }
 
+  // Use a timestamp to make it identical. That means, after we transit the 
peer to DA/S and then
+  // back to A, the log prefix will be changed. This is used to simplify the 
implementation for
+  // replication source, where we do not need to consider that a terminated 
shipper could be added
+  // back.
   private String getLogPrefix(String peerId) {
-    return factory.factoryId + WAL_FILE_NAME_DELIMITER + peerId;
+    return factory.factoryId + "-" + EnvironmentEdgeManager.currentTime() + 
"-" + peerId;
   }
 
   private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws 
IOException {
@@ -250,7 +254,7 @@ public class SyncReplicationWALProvider implements 
WALProvider, PeerActionListen
   @Override
   public void peerSyncReplicationStateChange(String peerId, 
SyncReplicationState from,
       SyncReplicationState to, int stage) {
-    if (from == SyncReplicationState.ACTIVE && to == 
SyncReplicationState.DOWNGRADE_ACTIVE) {
+    if (from == SyncReplicationState.ACTIVE) {
       if (stage == 0) {
         Lock lock = createLock.acquireLock(peerId);
         try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ea8e49/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 5ea3173..cff8ceb 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -80,6 +80,7 @@ import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Pair;
@@ -393,8 +394,8 @@ public abstract class TestReplicationSourceManager {
     // populate some znodes in the peer znode
     SortedSet<String> files = new TreeSet<>();
     String group = "testgroup";
-    String file1 = group + ".log1";
-    String file2 = group + ".log2";
+    String file1 = group + "." + EnvironmentEdgeManager.currentTime() + 
".log1";
+    String file2 = group + "." + EnvironmentEdgeManager.currentTime() + 
".log2";
     files.add(file1);
     files.add(file2);
     for (String file : files) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ea8e49/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSyncReplicationShipperQuit.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSyncReplicationShipperQuit.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSyncReplicationShipperQuit.java
new file mode 100644
index 0000000..f6dc3d7
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSyncReplicationShipperQuit.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.SyncReplicationTestBase;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Testcase for HBASE-20456.
+ */
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationShipperQuit extends SyncReplicationTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestSyncReplicationShipperQuit.class);
+
+  @Test
+  public void testShipperQuitWhenDA() throws Exception {
+    // set to serial replication
+    UTIL1.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
+      
.newBuilder(UTIL1.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
+    UTIL2.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
+      
.newBuilder(UTIL2.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.ACTIVE);
+
+    writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
+    HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
+    DualAsyncFSWAL wal =
+      (DualAsyncFSWAL) 
rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
+    String walGroupId =
+      
AbstractFSWALProvider.getWALPrefixFromWALName(wal.getCurrentFileName().getName());
+    ReplicationSourceShipper shipper =
+      ((ReplicationSource) ((Replication) 
rs.getReplicationSourceService()).getReplicationManager()
+        .getSource(PEER_ID)).workerThreads.get(walGroupId);
+    assertFalse(shipper.isFinished());
+
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.DOWNGRADE_ACTIVE);
+    writeAndVerifyReplication(UTIL1, UTIL2, 100, 200);
+
+    ReplicationSource source = (ReplicationSource) ((Replication) 
rs.getReplicationSourceService())
+      .getReplicationManager().getSource(PEER_ID);
+    // the peer is serial so here we can make sure that the previous wals have 
already been
+    // replicated, and finally the shipper should be removed from the worker 
pool
+    UTIL1.waitFor(10000, () -> !source.workerThreads.containsKey(walGroupId));
+    assertTrue(shipper.isFinished());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ea8e49/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 35e4f82..fac6f74 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -413,9 +413,7 @@ public class TestWALEntryStream {
     batch = reader.take();
     assertEquals(walPath, batch.getLastWalPath());
     assertEquals(5, batch.getNbEntries());
-    // Actually this should be true but we haven't handled this yet since for 
a normal queue the
-    // last one is always open... Not a big deal for now.
-    assertFalse(batch.isEndOfFile());
+    assertTrue(batch.isEndOfFile());
 
     assertSame(WALEntryBatch.NO_MORE_DATA, reader.take());
   }

Reply via email to