HBASE-19079 Support setting up two clusters with A and S stat

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f7a7578b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f7a7578b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f7a7578b

Branch: refs/heads/HBASE-19064
Commit: f7a7578b785c81b9fba464b3263506416a025b0f
Parents: 4f3ad07
Author: zhangduo <zhang...@apache.org>
Authored: Tue Apr 10 22:35:19 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri May 25 10:11:48 2018 +0800

----------------------------------------------------------------------
 .../replication/ReplicationPeerManager.java     |   5 +-
 ...ransitPeerSyncReplicationStateProcedure.java |   2 +-
 .../hbase/regionserver/wal/DualAsyncFSWAL.java  |  14 ++
 .../hadoop/hbase/regionserver/wal/WALUtil.java  |  25 ++-
 .../hbase/replication/ChainWALEntryFilter.java  |  28 +--
 .../ReplaySyncReplicationWALCallable.java       |  27 ++-
 .../SyncReplicationPeerInfoProviderImpl.java    |   6 +-
 .../hadoop/hbase/wal/AbstractFSWALProvider.java |  10 +-
 .../hbase/wal/SyncReplicationWALProvider.java   |  94 ++++++---
 .../org/apache/hadoop/hbase/wal/WALEdit.java    |   8 +-
 .../org/apache/hadoop/hbase/wal/WALFactory.java |   2 +-
 .../replication/TestReplicationAdmin.java       |  33 +--
 .../regionserver/wal/TestWALDurability.java     |   2 +
 .../replication/SyncReplicationTestBase.java    | 185 +++++++++++++++++
 .../hbase/replication/TestSyncReplication.java  | 207 -------------------
 .../replication/TestSyncReplicationActive.java  |  64 ++++++
 .../replication/TestSyncReplicationStandBy.java |  96 +++++++++
 17 files changed, 521 insertions(+), 287 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 41dd6e3..229549e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -68,8 +68,9 @@ public class ReplicationPeerManager {
 
   private final ImmutableMap<SyncReplicationState, 
EnumSet<SyncReplicationState>>
     allowedTransition = 
Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE,
-      EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), 
SyncReplicationState.STANDBY,
-      EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), 
SyncReplicationState.DOWNGRADE_ACTIVE,
+      EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE, 
SyncReplicationState.STANDBY),
+      SyncReplicationState.STANDBY, 
EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE),
+      SyncReplicationState.DOWNGRADE_ACTIVE,
       EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));
 
   ReplicationPeerManager(ReplicationPeerStorage peerStorage, 
ReplicationQueueStorage queueStorage,

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index cc51890..5da2b0c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -171,7 +171,7 @@ public class TransitPeerSyncReplicationStateProcedure
         }
         return Flow.HAS_MORE_STATE;
       case REPLAY_REMOTE_WAL_IN_PEER:
-        // TODO: replay remote wal when transiting from S to DA.
+        addChildProcedure(new RecoverStandbyProcedure(peerId));
         
setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
         return Flow.HAS_MORE_STATE;
       case REOPEN_ALL_REGIONS_IN_PEER:

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
index 0495337..a98567a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
@@ -38,6 +38,8 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
 
   private final Path remoteWalDir;
 
+  private volatile boolean skipRemoteWal = false;
+
   public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path 
remoteWalDir,
       String logDir, String archiveDir, Configuration conf, 
List<WALActionsListener> listeners,
       boolean failIfWALExists, String prefix, String suffix, EventLoopGroup 
eventLoopGroup,
@@ -51,6 +53,9 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
   @Override
   protected AsyncWriter createWriterInstance(Path path) throws IOException {
     AsyncWriter localWriter = super.createWriterInstance(path);
+    if (skipRemoteWal) {
+      return localWriter;
+    }
     AsyncWriter remoteWriter;
     boolean succ = false;
     try {
@@ -64,4 +69,13 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
     return CombinedAsyncWriter.create(CombinedAsyncWriter.Mode.SEQUENTIAL, 
remoteWriter,
       localWriter);
   }
+
+  // Allow temporarily skipping the creation of remote writer. When failing to 
write to the remote
+  // dfs cluster, we need to reopen the regions and switch to use the original 
wal writer. But we
+  // need to write a close marker when closing a region, and if it fails, the 
whole rs will abort.
+  // So here we need to skip the creation of remote writer and make it 
possible to write the region
+  // close marker.
+  public void skipRemoteWal() {
+    this.skipRemoteWal = true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index 1b17adc..3b18253 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -20,11 +20,13 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.NavigableMap;
-
+import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -34,7 +36,9 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
@@ -179,4 +183,23 @@ public class WALUtil {
     return conf.getLong("hbase.regionserver.hlog.blocksize",
         CommonFSUtils.getDefaultBlockSize(fs, dir) * 2);
   }
+
+  public static void filterCells(WALEdit edit, Function<Cell, Cell> mapper) {
+    ArrayList<Cell> cells = edit.getCells();
+    int size = cells.size();
+    int newSize = 0;
+    for (int i = 0; i < size; i++) {
+      Cell cell = mapper.apply(cells.get(i));
+      if (cell != null) {
+        cells.set(newSize, cell);
+        newSize++;
+      }
+    }
+    for (int i = size - 1; i >= newSize; i--) {
+      cells.remove(i);
+    }
+    if (newSize < size / 2) {
+      cells.trimToSize();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
index 6f2c764..2bb9811 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
@@ -21,11 +21,11 @@ package org.apache.hadoop.hbase.replication;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * A {@link WALEntryFilter} which contains multiple filters and applies them
@@ -82,22 +82,16 @@ public class ChainWALEntryFilter implements WALEntryFilter {
     if (entry == null || cellFilters.length == 0) {
       return;
     }
-    ArrayList<Cell> cells = entry.getEdit().getCells();
-    int size = cells.size();
-    for (int i = size - 1; i >= 0; i--) {
-      Cell cell = cells.get(i);
-      for (WALCellFilter filter : cellFilters) {
-        cell = filter.filterCell(entry, cell);
-        if (cell != null) {
-          cells.set(i, cell);
-        } else {
-          cells.remove(i);
-          break;
-        }
+    WALUtil.filterCells(entry.getEdit(), c -> filterCell(entry, c));
+  }
+
+  private Cell filterCell(Entry entry, Cell cell) {
+    for (WALCellFilter filter : cellFilters) {
+      cell = filter.filterCell(entry, cell);
+      if (cell == null) {
+        break;
       }
     }
-    if (cells.size() < size / 2) {
-      cells.trimToSize();
-    }
+    return cell;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
index 8dfe3a2..c9c5ef6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
@@ -21,21 +21,23 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
 import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -129,20 +131,31 @@ public class ReplaySyncReplicationWALCallable implements 
RSProcedureCallable {
     }
   }
 
+  // return whether we should include this entry.
+  private boolean filter(Entry entry) {
+    WALEdit edit = entry.getEdit();
+    WALUtil.filterCells(edit, c -> CellUtil.matchingFamily(c, 
WALEdit.METAFAMILY) ? null : c);
+    return !edit.isEmpty();
+  }
+
   private List<Entry> readWALEntries(Reader reader) throws IOException {
     List<Entry> entries = new ArrayList<>();
     if (reader == null) {
       return entries;
     }
     long size = 0;
-    Entry entry = reader.next();
-    while (entry != null) {
-      entries.add(entry);
-      size += entry.getEdit().heapSize();
-      if (size > batchSize) {
+    for (;;) {
+      Entry entry = reader.next();
+      if (entry == null) {
         break;
       }
-      entry = reader.next();
+      if (filter(entry)) {
+        entries.add(entry);
+        size += entry.getEdit().heapSize();
+        if (size > batchSize) {
+          break;
+        }
+      }
     }
     return entries;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
index e4afc33..cb33dab 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
@@ -54,8 +54,10 @@ class SyncReplicationPeerInfoProviderImpl implements 
SyncReplicationPeerInfoProv
     }
     Pair<SyncReplicationState, SyncReplicationState> states =
         peer.getSyncReplicationStateAndNewState();
-    if (states.getFirst() == SyncReplicationState.ACTIVE &&
-      states.getSecond() == SyncReplicationState.NONE) {
+    if ((states.getFirst() == SyncReplicationState.ACTIVE &&
+      states.getSecond() == SyncReplicationState.NONE) ||
+      (states.getFirst() == SyncReplicationState.DOWNGRADE_ACTIVE &&
+        states.getSecond() == SyncReplicationState.ACTIVE)) {
       return Optional.of(Pair.newPair(peerId, 
peer.getPeerConfig().getRemoteWALDir()));
     } else {
       return Optional.empty();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 3eb8f8f..5a3fba3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -136,8 +136,16 @@ public abstract class AbstractFSWALProvider<T extends 
AbstractFSWAL<?>> implemen
         walCopy = wal;
         if (walCopy == null) {
           walCopy = createWAL();
+          boolean succ = false;
+          try {
+            walCopy.init();
+            succ = true;
+          } finally {
+            if (!succ) {
+              walCopy.close();
+            }
+          }
           wal = walCopy;
-          walCopy.init();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 54287fe..9cbb095 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -69,7 +69,7 @@ public class SyncReplicationWALProvider implements 
WALProvider, PeerActionListen
   private final WALProvider provider;
 
   private SyncReplicationPeerInfoProvider peerInfoProvider =
-      new DefaultSyncReplicationPeerInfoProvider();
+    new DefaultSyncReplicationPeerInfoProvider();
 
   private WALFactory factory;
 
@@ -83,7 +83,11 @@ public class SyncReplicationWALProvider implements 
WALProvider, PeerActionListen
 
   private AtomicBoolean initialized = new AtomicBoolean(false);
 
-  private final ConcurrentMap<String, DualAsyncFSWAL> peerId2WAL = new 
ConcurrentHashMap<>();
+  // when switching from A to DA, we will put a Optional.empty into this map 
if there is no WAL for
+  // the peer yet. When getting WAL from this map the caller should know that 
it should not use
+  // DualAsyncFSWAL any more.
+  private final ConcurrentMap<String, Optional<DualAsyncFSWAL>> peerId2WAL =
+    new ConcurrentHashMap<>();
 
   private final KeyLocker<String> createLock = new KeyLocker<>();
 
@@ -123,18 +127,27 @@ public class SyncReplicationWALProvider implements 
WALProvider, PeerActionListen
   }
 
   private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws 
IOException {
-    DualAsyncFSWAL wal = peerId2WAL.get(peerId);
-    if (wal != null) {
-      return wal;
+    Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId);
+    if (opt != null) {
+      return opt.orElse(null);
     }
     Lock lock = createLock.acquireLock(peerId);
     try {
-      wal = peerId2WAL.get(peerId);
-      if (wal == null) {
-        wal = createWAL(peerId, remoteWALDir);
-        peerId2WAL.put(peerId, wal);
+      opt = peerId2WAL.get(peerId);
+      if (opt != null) {
+        return opt.orElse(null);
+      }
+      DualAsyncFSWAL wal = createWAL(peerId, remoteWALDir);
+      boolean succ = false;
+      try {
         wal.init();
+        succ = true;
+      } finally {
+        if (!succ) {
+          wal.close();
+        }
       }
+      peerId2WAL.put(peerId, Optional.of(wal));
       return wal;
     } finally {
       lock.unlock();
@@ -146,18 +159,20 @@ public class SyncReplicationWALProvider implements 
WALProvider, PeerActionListen
     if (region == null) {
       return provider.getWAL(null);
     }
+    WAL wal = null;
     Optional<Pair<String, String>> peerIdAndRemoteWALDir =
       peerInfoProvider.getPeerIdAndRemoteWALDir(region);
     if (peerIdAndRemoteWALDir.isPresent()) {
       Pair<String, String> pair = peerIdAndRemoteWALDir.get();
-      return getWAL(pair.getFirst(), pair.getSecond());
-    } else {
-      return provider.getWAL(region);
+      wal = getWAL(pair.getFirst(), pair.getSecond());
     }
+    return wal != null ? wal : provider.getWAL(region);
   }
 
   private Stream<WAL> getWALStream() {
-    return Streams.concat(peerId2WAL.values().stream(), 
provider.getWALs().stream());
+    return Streams.concat(
+      
peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get),
+      provider.getWALs().stream());
   }
 
   @Override
@@ -169,12 +184,14 @@ public class SyncReplicationWALProvider implements 
WALProvider, PeerActionListen
   public void shutdown() throws IOException {
     // save the last exception and rethrow
     IOException failure = null;
-    for (DualAsyncFSWAL wal : peerId2WAL.values()) {
-      try {
-        wal.shutdown();
-      } catch (IOException e) {
-        LOG.error("Shutdown WAL failed", e);
-        failure = e;
+    for (Optional<DualAsyncFSWAL> wal : peerId2WAL.values()) {
+      if (wal.isPresent()) {
+        try {
+          wal.get().shutdown();
+        } catch (IOException e) {
+          LOG.error("Shutdown WAL failed", e);
+          failure = e;
+        }
       }
     }
     provider.shutdown();
@@ -187,12 +204,14 @@ public class SyncReplicationWALProvider implements 
WALProvider, PeerActionListen
   public void close() throws IOException {
     // save the last exception and rethrow
     IOException failure = null;
-    for (DualAsyncFSWAL wal : peerId2WAL.values()) {
-      try {
-        wal.close();
-      } catch (IOException e) {
-        LOG.error("Close WAL failed", e);
-        failure = e;
+    for (Optional<DualAsyncFSWAL> wal : peerId2WAL.values()) {
+      if (wal.isPresent()) {
+        try {
+          wal.get().close();
+        } catch (IOException e) {
+          LOG.error("Close WAL failed", e);
+          failure = e;
+        }
       }
     }
     provider.close();
@@ -208,8 +227,8 @@ public class SyncReplicationWALProvider implements 
WALProvider, PeerActionListen
 
   @Override
   public long getLogFileSize() {
-    return 
peerId2WAL.values().stream().mapToLong(DualAsyncFSWAL::getLogFileSize).sum() +
-      provider.getLogFileSize();
+    return 
peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get)
+      .mapToLong(DualAsyncFSWAL::getLogFileSize).sum() + 
provider.getLogFileSize();
   }
 
   private void safeClose(WAL wal) {
@@ -231,10 +250,23 @@ public class SyncReplicationWALProvider implements 
WALProvider, PeerActionListen
   @Override
   public void peerSyncReplicationStateChange(String peerId, 
SyncReplicationState from,
       SyncReplicationState to, int stage) {
-    // TODO: stage 0
-    if (from == SyncReplicationState.ACTIVE && to == 
SyncReplicationState.DOWNGRADE_ACTIVE &&
-      stage == 1) {
-      safeClose(peerId2WAL.remove(peerId));
+    if (from == SyncReplicationState.ACTIVE && to == 
SyncReplicationState.DOWNGRADE_ACTIVE) {
+      if (stage == 0) {
+        Lock lock = createLock.acquireLock(peerId);
+        try {
+          Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId);
+          if (opt != null) {
+            opt.ifPresent(DualAsyncFSWAL::skipRemoteWal);
+          } else {
+            // add a place holder to tell the getWAL caller do not use 
DualAsyncFSWAL any more.
+            peerId2WAL.put(peerId, Optional.empty());
+          }
+        } finally {
+          lock.unlock();
+        }
+      } else if (stage == 1) {
+        peerId2WAL.remove(peerId).ifPresent(this::safeClose);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
index 1d4dc1b..cd0e52e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
@@ -20,12 +20,11 @@ package org.apache.hadoop.hbase.wal;
 
 import java.io.IOException;
 import java.util.ArrayList;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.io.HeapSize;
@@ -33,9 +32,9 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
@@ -54,7 +53,6 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
 @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION,
     HBaseInterfaceAudience.COPROC })
 public class WALEdit implements HeapSize {
-  private static final Logger LOG = LoggerFactory.getLogger(WALEdit.class);
 
   // TODO: Get rid of this; see HBASE-8457
   public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index afe043f..4f6a898 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -135,7 +135,7 @@ public class WALFactory {
   static WALProvider createProvider(Class<? extends WALProvider> clazz) throws 
IOException {
     LOG.info("Instantiating WALProvider of type {}", clazz);
     try {
-      return clazz.newInstance();
+      return clazz.getDeclaredConstructor().newInstance();
     } catch (Exception e) {
       LOG.error("couldn't set up WALProvider, the configured class is " + 
clazz);
       LOG.debug("Exception details for failure to load WALProvider.", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index 486ab51..ac98283 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -35,6 +35,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -259,9 +260,11 @@ public class TestReplicationAdmin {
     TEST_UTIL.createTable(tableName, Bytes.toBytes("family"));
     ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
 
-    String rootDir = "hdfs://srv1:9999/hbase";
+    Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL");
+    TEST_UTIL.getTestFileSystem().mkdirs(new Path(rootDir, ID_ONE));
     builder.setClusterKey(KEY_ONE);
-    builder.setRemoteWALDir(rootDir);
+    
builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(),
+      TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString());
     builder.setReplicateAllUserTables(false);
     Map<TableName, List<String>> tableCfs = new HashMap<>();
     tableCfs.put(tableName, new ArrayList<>());
@@ -1081,10 +1084,12 @@ public class TestReplicationAdmin {
       // OK
     }
 
-    String rootDir = "hdfs://srv1:9999/hbase";
+    Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL");
+    TEST_UTIL.getTestFileSystem().mkdirs(new Path(rootDir, ID_SECOND));
     builder = ReplicationPeerConfig.newBuilder();
     builder.setClusterKey(KEY_SECOND);
-    builder.setRemoteWALDir(rootDir);
+    
builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(),
+      TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString());
     builder.setReplicateAllUserTables(false);
     Map<TableName, List<String>> tableCfs = new HashMap<>();
     tableCfs.put(tableName, new ArrayList<>());
@@ -1105,13 +1110,18 @@ public class TestReplicationAdmin {
     assertEquals(SyncReplicationState.ACTIVE,
       hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
 
-    try {
-      hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
-        SyncReplicationState.STANDBY);
-      fail("Can't transit cluster state from ACTIVE to STANDBY");
-    } catch (Exception e) {
-      // OK
-    }
+    hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, 
SyncReplicationState.STANDBY);
+    assertEquals(SyncReplicationState.STANDBY,
+      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
+
+    hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
+      SyncReplicationState.DOWNGRADE_ACTIVE);
+    assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
+      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
+
+    hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, 
SyncReplicationState.ACTIVE);
+    assertEquals(SyncReplicationState.ACTIVE,
+      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
 
     hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
       SyncReplicationState.DOWNGRADE_ACTIVE);
@@ -1121,7 +1131,6 @@ public class TestReplicationAdmin {
     hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, 
SyncReplicationState.STANDBY);
     assertEquals(SyncReplicationState.STANDBY,
       hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
-
     try {
       hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, 
SyncReplicationState.ACTIVE);
       fail("Can't transit cluster state from STANDBY to ACTIVE");

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java
index 17f24e8..c446306 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java
@@ -104,6 +104,7 @@ public class TestWALDurability {
     FileSystem fs = FileSystem.get(conf);
     Path rootDir = new Path(dir + getName());
     CustomFSLog customFSLog = new CustomFSLog(fs, rootDir, getName(), conf);
+    customFSLog.init();
     HRegion region = initHRegion(tableName, null, null, customFSLog);
     byte[] bytes = Bytes.toBytes(getName());
     Put put = new Put(bytes);
@@ -118,6 +119,7 @@ public class TestWALDurability {
     conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true");
     fs = FileSystem.get(conf);
     customFSLog = new CustomFSLog(fs, rootDir, getName(), conf);
+    customFSLog.init();
     region = initHRegion(tableName, null, null, customFSLog);
 
     customFSLog.resetSyncFlag();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
new file mode 100644
index 0000000..30dbdb5
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+/**
+ * Base class for testing sync replication.
+ */
+public class SyncReplicationTestBase {
+
+  protected static final HBaseZKTestingUtility ZK_UTIL = new 
HBaseZKTestingUtility();
+
+  protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
+
+  protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
+
+  protected static TableName TABLE_NAME = TableName.valueOf("SyncRep");
+
+  protected static byte[] CF = Bytes.toBytes("cf");
+
+  protected static byte[] CQ = Bytes.toBytes("cq");
+
+  protected static String PEER_ID = "1";
+
+  private static void initTestingUtility(HBaseTestingUtility util, String 
zkParent) {
+    util.setZkCluster(ZK_UTIL.getZkCluster());
+    Configuration conf = util.getConfiguration();
+    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent);
+    conf.setInt("replication.source.size.capacity", 102400);
+    conf.setLong("replication.source.sleepforretries", 100);
+    conf.setInt("hbase.regionserver.maxlogs", 10);
+    conf.setLong("hbase.master.logcleaner.ttl", 10);
+    conf.setInt("zookeeper.recovery.retry", 1);
+    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+    conf.setInt("replication.stats.thread.period.seconds", 5);
+    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    conf.setFloat("replication.source.ratio", 1.0f);
+    conf.setBoolean("replication.source.eof.autorecovery", true);
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    ZK_UTIL.startMiniZKCluster();
+    initTestingUtility(UTIL1, "/cluster1");
+    initTestingUtility(UTIL2, "/cluster2");
+    UTIL1.startMiniCluster(3);
+    UTIL2.startMiniCluster(3);
+    TableDescriptor td =
+      
TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
+    UTIL1.getAdmin().createTable(td);
+    UTIL2.getAdmin().createTable(td);
+    FileSystem fs1 = UTIL1.getTestFileSystem();
+    FileSystem fs2 = UTIL2.getTestFileSystem();
+    Path remoteWALDir1 =
+      new 
Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
+        "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
+    Path remoteWALDir2 =
+      new 
Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
+        "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
+    UTIL1.getAdmin().addReplicationPeer(PEER_ID,
+      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
+        .setReplicateAllUserTables(false)
+        .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
+        .setRemoteWALDir(remoteWALDir2.toUri().toString()).build());
+    UTIL2.getAdmin().addReplicationPeer(PEER_ID,
+      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey())
+        .setReplicateAllUserTables(false)
+        .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
+        .setRemoteWALDir(remoteWALDir1.toUri().toString()).build());
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL1.shutdownMiniCluster();
+    UTIL2.shutdownMiniCluster();
+    ZK_UTIL.shutdownMiniZKCluster();
+  }
+
+  protected final void write(HBaseTestingUtility util, int start, int end) 
throws IOException {
+    try (Table table = util.getConnection().getTable(TABLE_NAME)) {
+      for (int i = start; i < end; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+  }
+
+  protected final void verify(HBaseTestingUtility util, int start, int end) 
throws IOException {
+    try (Table table = util.getConnection().getTable(TABLE_NAME)) {
+      for (int i = start; i < end; i++) {
+        assertEquals(i, Bytes.toInt(table.get(new 
Get(Bytes.toBytes(i))).getValue(CF, CQ)));
+      }
+    }
+  }
+
+  protected final void verifyThroughRegion(HBaseTestingUtility util, int 
start, int end)
+      throws IOException {
+    HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+    for (int i = start; i < end; i++) {
+      assertEquals(i, Bytes.toInt(region.get(new 
Get(Bytes.toBytes(i))).getValue(CF, CQ)));
+    }
+  }
+
+  protected final void verifyNotReplicatedThroughRegion(HBaseTestingUtility 
util, int start,
+      int end) throws IOException {
+    HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+    for (int i = start; i < end; i++) {
+      assertTrue(region.get(new Get(Bytes.toBytes(i))).isEmpty());
+    }
+  }
+
+  protected final void waitUntilReplicationDone(HBaseTestingUtility util, int 
end)
+      throws Exception {
+    // The reject check is in RSRpcService so we can still read through HRegion
+    HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+    util.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return !region.get(new Get(Bytes.toBytes(end - 1))).isEmpty();
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Replication has not been catched up yet";
+      }
+    });
+  }
+
+  protected final void writeAndVerifyReplication(HBaseTestingUtility util1,
+      HBaseTestingUtility util2, int start, int end) throws Exception {
+    write(util1, start, end);
+    waitUntilReplicationDone(util2, end);
+    verifyThroughRegion(util2, start, end);
+  }
+
+  protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) {
+    Path remoteWALDir = new Path(mfs.getWALRootDir(), 
ReplicationUtils.REMOTE_WAL_DIR_NAME);
+    return new Path(remoteWALDir, PEER_ID);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
deleted file mode 100644
index 288dcbf..0000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HBaseZKTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.master.MasterFileSystem;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
-
-@Category({ ReplicationTests.class, LargeTests.class })
-public class TestSyncReplication {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestSyncReplication.class);
-
-  private static final HBaseZKTestingUtility ZK_UTIL = new 
HBaseZKTestingUtility();
-
-  private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
-
-  private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
-
-  private static TableName TABLE_NAME = TableName.valueOf("SyncRep");
-
-  private static byte[] CF = Bytes.toBytes("cf");
-
-  private static byte[] CQ = Bytes.toBytes("cq");
-
-  private static String PEER_ID = "1";
-
-  private static void initTestingUtility(HBaseTestingUtility util, String 
zkParent) {
-    util.setZkCluster(ZK_UTIL.getZkCluster());
-    Configuration conf = util.getConfiguration();
-    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent);
-    conf.setInt("replication.source.size.capacity", 102400);
-    conf.setLong("replication.source.sleepforretries", 100);
-    conf.setInt("hbase.regionserver.maxlogs", 10);
-    conf.setLong("hbase.master.logcleaner.ttl", 10);
-    conf.setInt("zookeeper.recovery.retry", 1);
-    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
-    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
-    conf.setInt("replication.stats.thread.period.seconds", 5);
-    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
-    conf.setLong("replication.sleep.before.failover", 2000);
-    conf.setInt("replication.source.maxretriesmultiplier", 10);
-    conf.setFloat("replication.source.ratio", 1.0f);
-    conf.setBoolean("replication.source.eof.autorecovery", true);
-  }
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    ZK_UTIL.startMiniZKCluster();
-    initTestingUtility(UTIL1, "/cluster1");
-    initTestingUtility(UTIL2, "/cluster2");
-    UTIL1.startMiniCluster(3);
-    UTIL2.startMiniCluster(3);
-    TableDescriptor td =
-        
TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
-          
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
-    UTIL1.getAdmin().createTable(td);
-    UTIL2.getAdmin().createTable(td);
-    FileSystem fs1 = UTIL1.getTestFileSystem();
-    FileSystem fs2 = UTIL2.getTestFileSystem();
-    Path remoteWALDir1 =
-        new 
Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
-          "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
-    Path remoteWALDir2 =
-        new 
Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
-          "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
-    UTIL1.getAdmin().addReplicationPeer(PEER_ID,
-      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
-        .setReplicateAllUserTables(false)
-        .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
-        .setRemoteWALDir(remoteWALDir2.toUri().toString()).build());
-    UTIL2.getAdmin().addReplicationPeer(PEER_ID,
-      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey())
-        .setReplicateAllUserTables(false)
-        .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
-        .setRemoteWALDir(remoteWALDir1.toUri().toString()).build());
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    UTIL1.shutdownMiniCluster();
-    UTIL2.shutdownMiniCluster();
-    ZK_UTIL.shutdownMiniZKCluster();
-  }
-
-  @FunctionalInterface
-  private interface TableAction {
-
-    void call(Table table) throws IOException;
-  }
-
-  private void assertDisallow(Table table, TableAction action) throws 
IOException {
-    try {
-      action.call(table);
-    } catch (DoNotRetryIOException | RetriesExhaustedException e) {
-      // expected
-      assertThat(e.getMessage(), containsString("STANDBY"));
-    }
-  }
-
-  @Test
-  public void testStandby() throws Exception {
-    MasterFileSystem mfs = 
UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
-    Path remoteWALDir = new Path(mfs.getWALRootDir(), 
ReplicationUtils.REMOTE_WAL_DIR_NAME);
-    Path remoteWALDirForPeer = new Path(remoteWALDir, PEER_ID);
-    assertFalse(mfs.getWALFileSystem().exists(remoteWALDirForPeer));
-    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
-      SyncReplicationState.STANDBY);
-    assertTrue(mfs.getWALFileSystem().exists(remoteWALDirForPeer));
-    try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
-      assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row"))));
-      assertDisallow(table,
-        t -> t.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, 
Bytes.toBytes("row"))));
-      assertDisallow(table, t -> t.delete(new Delete(Bytes.toBytes("row"))));
-      assertDisallow(table, t -> t.incrementColumnValue(Bytes.toBytes("row"), 
CF, CQ, 1));
-      assertDisallow(table,
-        t -> t.append(new Append(Bytes.toBytes("row")).addColumn(CF, CQ, 
Bytes.toBytes("row"))));
-      assertDisallow(table,
-        t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new 
Get(Bytes.toBytes("row1")))));
-      assertDisallow(table,
-        t -> t
-          .put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, 
Bytes.toBytes("row")),
-            new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, 
Bytes.toBytes("row1")))));
-      assertDisallow(table, t -> t.mutateRow(new 
RowMutations(Bytes.toBytes("row"))
-        .add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, 
Bytes.toBytes("row")))));
-    }
-    // But we should still allow replication writes
-    try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
-      for (int i = 0; i < 100; i++) {
-        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
-      }
-    }
-    // The reject check is in RSRpcService so we can still read through HRegion
-    HRegion region = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
-    UTIL2.waitFor(30000, new ExplainingPredicate<Exception>() {
-
-      @Override
-      public boolean evaluate() throws Exception {
-        return !region.get(new Get(Bytes.toBytes(99))).isEmpty();
-      }
-
-      @Override
-      public String explainFailure() throws Exception {
-        return "Replication has not been catched up yet";
-      }
-    });
-    for (int i = 0; i < 100; i++) {
-      assertEquals(i, Bytes.toInt(region.get(new 
Get(Bytes.toBytes(i))).getValue(CF, CQ)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
new file mode 100644
index 0000000..f4fb5fe
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationActive extends SyncReplicationTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestSyncReplicationActive.class);
+
+  @Test
+  public void testActive() throws Exception {
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.ACTIVE);
+    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
+    write(UTIL1, 0, 100);
+    Thread.sleep(2000);
+    // peer is disabled so no data have been replicated
+    verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.DOWNGRADE_ACTIVE);
+    // confirm that the data is there after we convert the peer to DA
+    verify(UTIL2, 0, 100);
+
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.ACTIVE);
+
+    writeAndVerifyReplication(UTIL2, UTIL1, 100, 200);
+
+    // shutdown the cluster completely
+    UTIL1.shutdownMiniCluster();
+    // confirm that we can convert to DA even if the remote slave cluster is 
down
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.DOWNGRADE_ACTIVE);
+    write(UTIL2, 200, 300);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
new file mode 100644
index 0000000..ed61d2a
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationStandBy extends SyncReplicationTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestSyncReplicationStandBy.class);
+
+  @FunctionalInterface
+  private interface TableAction {
+
+    void call(Table table) throws IOException;
+  }
+
+  private void assertDisallow(Table table, TableAction action) throws 
IOException {
+    try {
+      action.call(table);
+    } catch (DoNotRetryIOException | RetriesExhaustedException e) {
+      // expected
+      assertThat(e.getMessage(), containsString("STANDBY"));
+    }
+  }
+
+  @Test
+  public void testStandby() throws Exception {
+    MasterFileSystem mfs = 
UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
+    Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID);
+    assertFalse(mfs.getWALFileSystem().exists(remoteWALDir));
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    assertTrue(mfs.getWALFileSystem().exists(remoteWALDir));
+    try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
+      assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row"))));
+      assertDisallow(table,
+        t -> t.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, 
Bytes.toBytes("row"))));
+      assertDisallow(table, t -> t.delete(new Delete(Bytes.toBytes("row"))));
+      assertDisallow(table, t -> t.incrementColumnValue(Bytes.toBytes("row"), 
CF, CQ, 1));
+      assertDisallow(table,
+        t -> t.append(new Append(Bytes.toBytes("row")).addColumn(CF, CQ, 
Bytes.toBytes("row"))));
+      assertDisallow(table,
+        t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new 
Get(Bytes.toBytes("row1")))));
+      assertDisallow(table,
+        t -> t
+          .put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, 
Bytes.toBytes("row")),
+            new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, 
Bytes.toBytes("row1")))));
+      assertDisallow(table, t -> t.mutateRow(new 
RowMutations(Bytes.toBytes("row"))
+        .add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, 
Bytes.toBytes("row")))));
+    }
+    // We should still allow replication writes
+    writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
+  }
+}

Reply via email to