HBASE-11568 Async WAL replication for region replicas

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e28ec724
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e28ec724
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e28ec724

Branch: refs/heads/master
Commit: e28ec72464ccb266c49e5ef42dd87f938e71ffde
Parents: d44e7df
Author: Enis Soztutar <e...@apache.org>
Authored: Tue Aug 19 18:59:22 2014 -0700
Committer: Enis Soztutar <e...@apache.org>
Committed: Tue Aug 19 18:59:22 2014 -0700

----------------------------------------------------------------------
 .../client/RegionAdminServiceCallable.java      | 131 +++++
 .../hadoop/hbase/client/RpcRetryingCaller.java  |   2 +-
 .../src/main/resources/hbase-default.xml        |  14 +
 .../master/handler/CreateTableHandler.java      |  13 +-
 .../hbase/protobuf/ReplicationProtbufUtil.java  |  19 +-
 .../hbase/regionserver/RSRpcServices.java       |  23 +-
 .../hbase/regionserver/wal/HLogSplitter.java    | 197 ++++---
 .../RegionReplicaReplicationEndpoint.java       | 558 +++++++++++++++++++
 .../regionserver/ReplicationSource.java         |   3 +-
 .../hbase/util/ServerRegionReplicaUtil.java     |  51 ++
 .../hadoop/hbase/HBaseTestingUtility.java       |  35 +-
 .../hbase/regionserver/TestRegionReplicas.java  |  81 +--
 .../regionserver/TestRegionServerNoMaster.java  |  68 +--
 .../hbase/regionserver/wal/TestHLogMethods.java |  27 +-
 .../TestRegionReplicaReplicationEndpoint.java   | 345 ++++++++++++
 ...egionReplicaReplicationEndpointNoMaster.java | 264 +++++++++
 16 files changed, 1637 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
new file mode 100644
index 0000000..d114304
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+
+/**
+ * Similar to {@link RegionServerCallable} but for the AdminService interface. 
This service callable
+ * assumes a Table and row and thus does region locating similar to 
RegionServerCallable.
+ */
+public abstract class RegionAdminServiceCallable<T> implements 
RetryingCallable<T> {
+
+  protected final ClusterConnection connection;
+
+  protected AdminService.BlockingInterface stub;
+
+  protected HRegionLocation location;
+
+  protected final TableName tableName;
+  protected final byte[] row;
+
+  protected final static int MIN_WAIT_DEAD_SERVER = 10000;
+
+  public RegionAdminServiceCallable(ClusterConnection connection, TableName 
tableName, byte[] row) {
+    this(connection, null, tableName, row);
+  }
+
+  public RegionAdminServiceCallable(ClusterConnection connection, 
HRegionLocation location,
+      TableName tableName, byte[] row) {
+    this.connection = connection;
+    this.location = location;
+    this.tableName = tableName;
+    this.row = row;
+  }
+
+  @Override
+  public void prepare(boolean reload) throws IOException {
+    if (Thread.interrupted()) {
+      throw new InterruptedIOException();
+    }
+
+    if (reload || location == null) {
+      location = getLocation(!reload);
+    }
+
+    if (location == null) {
+      // With this exception, there will be a retry.
+      throw new HBaseIOException(getExceptionMessage());
+    }
+
+    this.setStub(connection.getAdmin(location.getServerName()));
+  }
+
+  protected void setStub(AdminService.BlockingInterface stub) {
+    this.stub = stub;
+  }
+
+  public abstract HRegionLocation getLocation(boolean useCache) throws 
IOException;
+
+  @Override
+  public void throwable(Throwable t, boolean retrying) {
+    if (t instanceof SocketTimeoutException ||
+        t instanceof ConnectException ||
+        t instanceof RetriesExhaustedException ||
+        (location != null && 
getConnection().isDeadServer(location.getServerName()))) {
+      // if thrown these exceptions, we clear all the cache entries that
+      // map to that slow/dead server; otherwise, let cache miss and ask
+      // hbase:meta again to find the new location
+      if (this.location != null) 
getConnection().clearCaches(location.getServerName());
+    } else if (t instanceof RegionMovedException) {
+      getConnection().updateCachedLocations(tableName, row, t, location);
+    } else if (t instanceof NotServingRegionException) {
+      // Purge cache entries for this specific region from hbase:meta cache
+      // since we don't call connect(true) when number of retries is 1.
+      getConnection().deleteCachedRegionLocation(location);
+    }
+  }
+
+  /**
+   * @return {@link HConnection} instance used by this Callable.
+   */
+  HConnection getConnection() {
+    return this.connection;
+  }
+
+  //subclasses can override this.
+  protected String getExceptionMessage() {
+    return "There is no location";
+  }
+
+  @Override
+  public String getExceptionMessageAdditionalDetail() {
+    return null;
+  }
+
+  @Override
+  public long sleep(long pause, int tries) {
+    long sleep = ConnectionUtils.getPauseTime(pause, tries + 1);
+    if (sleep < MIN_WAIT_DEAD_SERVER
+        && (location == null || 
connection.isDeadServer(location.getServerName()))) {
+      sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
+    }
+    return sleep;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index 286b6fe..9e11a27 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -48,7 +48,7 @@ import com.google.protobuf.ServiceException;
  */
 @InterfaceAudience.Private
 public class RpcRetryingCaller<T> {
-  static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
+  public static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
   /**
    * When we started making calls.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml 
b/hbase-common/src/main/resources/hbase-default.xml
index 959c4e1..30d50df 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1390,6 +1390,20 @@ possible configurations would overwhelm and obscure the 
important.
     </description>
   </property>
   <property>
+    <name>hbase.region.replica.replication.enabled</name>
+    <value>false</value>
+    <description>
+      Whether asynchronous WAL replication to the secondary region replicas is 
enabled or not.
+      If this is enabled, a replication peer named 
"region_replica_replication" will be created
+      which will tail the logs and replicate the mutatations to region 
replicas for tables that
+      have region replication > 1. If this is enabled once, disabling this 
replication also
+      requires disabling the replication peer using shell or ReplicationAdmin 
java class.
+      Replication to secondary region replicas works over standard 
inter-cluster replication. 
+      So replication, if disabled explicitly, also has to be enabled by 
setting "hbase.replication" 
+      to true for this feature to work.
+    </description>
+  </property>
+  <property>
     <name>hbase.http.filter.initializers</name>
     <value>org.apache.hadoop.hbase.http.lib.StaticUserWebFilter</value>
     <description>

http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
index fa34ba1..4c20dc5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
@@ -51,6 +51,7 @@ import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 
 /**
  * Handler to create a table.
@@ -196,9 +197,8 @@ public class CreateTableHandler extends EventHandler {
    */
   protected void completed(final Throwable exception) {
     releaseTableLock();
-    String msg = exception == null ? null : exception.getMessage();
     LOG.info("Table, " + this.hTableDescriptor.getTableName() + ", creation " +
-        msg == null ? "successful" : "failed. " + msg);
+        (exception == null ? "successful" : "failed. " + exception));
     if (exception != null) {
       removeEnablingTable(this.assignmentManager, 
this.hTableDescriptor.getTableName());
     }
@@ -243,11 +243,16 @@ public class CreateTableHandler extends EventHandler {
       // 5. Add replicas if needed
       regionInfos = addReplicas(hTableDescriptor, regionInfos);
 
-      // 6. Trigger immediate assignment of the regions in round-robin fashion
+      // 6. Setup replication for region replicas if needed
+      if (hTableDescriptor.getRegionReplication() > 1) {
+        ServerRegionReplicaUtil.setupRegionReplicaReplication(conf);
+      }
+
+      // 7. Trigger immediate assignment of the regions in round-robin fashion
       ModifyRegionUtils.assignRegions(assignmentManager, regionInfos);
     }
 
-    // 6. Set table enabled flag up in zk.
+    // 8. Set table enabled flag up in zk.
     try {
       assignmentManager.getTableStateManager().setTableState(tableName,
         ZooKeeperProtos.Table.State.ENABLED);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index fdf4bb3..a50978b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -59,7 +59,7 @@ public class ReplicationProtbufUtil {
   public static void replicateWALEntry(final AdminService.BlockingInterface 
admin,
       final HLog.Entry[] entries) throws IOException {
     Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
-      buildReplicateWALEntryRequest(entries);
+      buildReplicateWALEntryRequest(entries, null);
     PayloadCarryingRpcController controller = new 
PayloadCarryingRpcController(p.getSecond());
     try {
       admin.replicateWALEntry(controller, p.getFirst());
@@ -77,6 +77,19 @@ public class ReplicationProtbufUtil {
    */
   public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
       buildReplicateWALEntryRequest(final HLog.Entry[] entries) {
+    return buildReplicateWALEntryRequest(entries, null);
+  }
+
+  /**
+   * Create a new ReplicateWALEntryRequest from a list of HLog entries
+   *
+   * @param entries the HLog entries to be replicated
+   * @param encodedRegionName alternative region name to use if not null
+   * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the 
WALEdit values
+   * found.
+   */
+  public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
+      buildReplicateWALEntryRequest(final HLog.Entry[] entries, byte[] 
encodedRegionName) {
     // Accumulate all the KVs seen in here.
     List<List<? extends Cell>> allkvs = new ArrayList<List<? extends 
Cell>>(entries.length);
     int size = 0;
@@ -91,7 +104,9 @@ public class ReplicationProtbufUtil {
       WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
       HLogKey key = entry.getKey();
       keyBuilder.setEncodedRegionName(
-        ByteStringer.wrap(key.getEncodedRegionName()));
+        ByteStringer.wrap(encodedRegionName == null
+            ? key.getEncodedRegionName()
+            : encodedRegionName));
       keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName()));
       keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
       keyBuilder.setWriteTime(key.getWriteTime());

http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 10da06d..0f89a8b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -156,6 +157,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Counter;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.util.Strings;
 import org.apache.hadoop.net.DNS;
 import org.apache.zookeeper.KeeperException;
@@ -1358,13 +1360,26 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
         // empty input
         return ReplicateWALEntryResponse.newBuilder().build();
       }
-      HRegion region = regionServer.getRegionByEncodedName(
-        entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
-      RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
+      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
+      HRegion region = 
regionServer.getRegionByEncodedName(regionName.toStringUtf8());
+      RegionCoprocessorHost coprocessorHost =
+          ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
+            ? region.getCoprocessorHost()
+            : null; // do not invoke coprocessors if this is a secondary 
region replica
       List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, 
WALEdit>>();
       // when tag is enabled, we need tag replay edits with log sequence number
       boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 
3);
+
+      // Skip adding the edits to WAL if this is a secondary region replica
+      boolean isPrimary = 
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
+      Durability durability = isPrimary ? Durability.USE_DEFAULT : 
Durability.SKIP_WAL;
+
       for (WALEntry entry : entries) {
+        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
+          throw new NotServingRegionException("Replay request contains entries 
from multiple " +
+              "regions. First region:" + regionName.toStringUtf8() + " , other 
region:"
+              + entry.getKey().getEncodedRegionName());
+        }
         if (regionServer.nonceManager != null) {
           long nonceGroup = entry.getKey().hasNonceGroup()
             ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
@@ -1374,7 +1389,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
         Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
           new Pair<HLogKey, WALEdit>();
         List<HLogSplitter.MutationReplay> edits = 
HLogSplitter.getMutationsFromWALEntry(entry,
-          cells, walEntry, needAddReplayTag);
+          cells, walEntry, needAddReplayTag, durability);
         if (coprocessorHost != null) {
           // Start coprocessor replay here. The coprocessor is for each 
WALEdit instead of a
           // KeyValue.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index 815730c..2df9f50 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -133,6 +133,7 @@ public class HLogSplitter {
 
   // Major subcomponents of the split process.
   // These are separated into inner classes to make testing easier.
+  PipelineController controller;
   OutputSink outputSink;
   EntryBuffers entryBuffers;
 
@@ -141,14 +142,6 @@ public class HLogSplitter {
   private ZooKeeperWatcher watcher;
   private CoordinatedStateManager csm;
 
-  // If an exception is thrown by one of the other threads, it will be
-  // stored here.
-  protected AtomicReference<Throwable> thrown = new 
AtomicReference<Throwable>();
-
-  // Wait/notify for when data has been produced by the reader thread,
-  // consumed by the reader thread, or an exception occurred
-  final Object dataAvailable = new Object();
-
   private MonitoredTask status;
 
   // For checking the latest flushed sequence id
@@ -184,8 +177,9 @@ public class HLogSplitter {
     this.sequenceIdChecker = idChecker;
     this.watcher = zkw;
     this.csm = csm;
+    this.controller = new PipelineController();
 
-    entryBuffers = new EntryBuffers(
+    entryBuffers = new EntryBuffers(controller,
         this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
             128*1024*1024));
 
@@ -196,13 +190,13 @@ public class HLogSplitter {
 
     this.numWriterThreads = 
this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
     if (zkw != null && csm != null && this.distributedLogReplay) {
-      outputSink = new LogReplayOutputSink(numWriterThreads);
+      outputSink = new LogReplayOutputSink(controller, entryBuffers, 
numWriterThreads);
     } else {
       if (this.distributedLogReplay) {
         LOG.info("ZooKeeperWatcher is passed in as NULL so disable 
distrubitedLogRepaly.");
       }
       this.distributedLogReplay = false;
-      outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
+      outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, 
numWriterThreads);
     }
 
   }
@@ -636,22 +630,6 @@ public class HLogSplitter {
     }
   }
 
-  private void writerThreadError(Throwable t) {
-    thrown.compareAndSet(null, t);
-  }
-
-  /**
-   * Check for errors in the writer threads. If any is found, rethrow it.
-   */
-  private void checkForErrors() throws IOException {
-    Throwable thrown = this.thrown.get();
-    if (thrown == null) return;
-    if (thrown instanceof IOException) {
-      throw new IOException(thrown);
-    } else {
-      throw new RuntimeException(thrown);
-    }
-  }
   /**
    * Create a new {@link Writer} for writing log splits.
    */
@@ -680,13 +658,45 @@ public class HLogSplitter {
   }
 
   /**
+   * Contains some methods to control WAL-entries producer / consumer 
interactions
+   */
+  public static class PipelineController {
+    // If an exception is thrown by one of the other threads, it will be
+    // stored here.
+    AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+
+    // Wait/notify for when data has been produced by the writer thread,
+    // consumed by the reader thread, or an exception occurred
+    public final Object dataAvailable = new Object();
+
+    void writerThreadError(Throwable t) {
+      thrown.compareAndSet(null, t);
+    }
+
+    /**
+     * Check for errors in the writer threads. If any is found, rethrow it.
+     */
+    void checkForErrors() throws IOException {
+      Throwable thrown = this.thrown.get();
+      if (thrown == null) return;
+      if (thrown instanceof IOException) {
+        throw new IOException(thrown);
+      } else {
+        throw new RuntimeException(thrown);
+      }
+    }
+  }
+
+  /**
    * Class which accumulates edits and separates them into a buffer per region
    * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
    * a predefined threshold.
    *
    * Writer threads then pull region-specific buffers from this class.
    */
-  class EntryBuffers {
+  public static class EntryBuffers {
+    PipelineController controller;
+
     Map<byte[], RegionEntryBuffer> buffers =
       new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
 
@@ -698,7 +708,8 @@ public class HLogSplitter {
     long totalBuffered = 0;
     long maxHeapUsage;
 
-    EntryBuffers(long maxHeapUsage) {
+    public EntryBuffers(PipelineController controller, long maxHeapUsage) {
+      this.controller = controller;
       this.maxHeapUsage = maxHeapUsage;
     }
 
@@ -709,7 +720,7 @@ public class HLogSplitter {
      * @throws InterruptedException
      * @throws IOException
      */
-    void appendEntry(Entry entry) throws InterruptedException, IOException {
+    public void appendEntry(Entry entry) throws InterruptedException, 
IOException {
       HLogKey key = entry.getKey();
 
       RegionEntryBuffer buffer;
@@ -724,15 +735,15 @@ public class HLogSplitter {
       }
 
       // If we crossed the chunk threshold, wait for more space to be available
-      synchronized (dataAvailable) {
+      synchronized (controller.dataAvailable) {
         totalBuffered += incrHeap;
-        while (totalBuffered > maxHeapUsage && thrown.get() == null) {
+        while (totalBuffered > maxHeapUsage && controller.thrown.get() == 
null) {
           LOG.debug("Used " + totalBuffered + " bytes of buffered edits, 
waiting for IO threads...");
-          dataAvailable.wait(2000);
+          controller.dataAvailable.wait(2000);
         }
-        dataAvailable.notifyAll();
+        controller.dataAvailable.notifyAll();
       }
-      checkForErrors();
+      controller.checkForErrors();
     }
 
     /**
@@ -765,16 +776,30 @@ public class HLogSplitter {
       }
       long size = buffer.heapSize();
 
-      synchronized (dataAvailable) {
+      synchronized (controller.dataAvailable) {
         totalBuffered -= size;
         // We may unblock writers
-        dataAvailable.notifyAll();
+        controller.dataAvailable.notifyAll();
       }
     }
 
     synchronized boolean isRegionCurrentlyWriting(byte[] region) {
       return currentlyWriting.contains(region);
     }
+
+    public void waitUntilDrained() {
+      synchronized (controller.dataAvailable) {
+        while (totalBuffered > 0) {
+          try {
+            controller.dataAvailable.wait(2000);
+          } catch (InterruptedException e) {
+            LOG.warn("Got intrerrupted while waiting for EntryBuffers is 
drained");
+            Thread.interrupted();
+            break;
+          }
+        }
+      }
+    }
   }
 
   /**
@@ -783,7 +808,7 @@ public class HLogSplitter {
    * share a single byte array instance for the table and region name.
    * Also tracks memory usage of the accumulated edits.
    */
-  static class RegionEntryBuffer implements HeapSize {
+  public static class RegionEntryBuffer implements HeapSize {
     long heapInBuffer = 0;
     List<Entry> entryBuffer;
     TableName tableName;
@@ -815,14 +840,30 @@ public class HLogSplitter {
     public long heapSize() {
       return heapInBuffer;
     }
+
+    public byte[] getEncodedRegionName() {
+      return encodedRegionName;
+    }
+
+    public List<Entry> getEntryBuffer() {
+      return entryBuffer;
+    }
+
+    public TableName getTableName() {
+      return tableName;
+    }
   }
 
-  class WriterThread extends Thread {
+  public static class WriterThread extends Thread {
     private volatile boolean shouldStop = false;
+    private PipelineController controller;
+    private EntryBuffers entryBuffers;
     private OutputSink outputSink = null;
 
-    WriterThread(OutputSink sink, int i) {
+    WriterThread(PipelineController controller, EntryBuffers entryBuffers, 
OutputSink sink, int i){
       super(Thread.currentThread().getName() + "-Writer-" + i);
+      this.controller = controller;
+      this.entryBuffers = entryBuffers;
       outputSink = sink;
     }
 
@@ -832,7 +873,7 @@ public class HLogSplitter {
         doRun();
       } catch (Throwable t) {
         LOG.error("Exiting thread", t);
-        writerThreadError(t);
+        controller.writerThreadError(t);
       }
     }
 
@@ -842,12 +883,12 @@ public class HLogSplitter {
         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
         if (buffer == null) {
           // No data currently available, wait on some more to show up
-          synchronized (dataAvailable) {
+          synchronized (controller.dataAvailable) {
             if (shouldStop && !this.outputSink.flush()) {
               return;
             }
             try {
-              dataAvailable.wait(500);
+              controller.dataAvailable.wait(500);
             } catch (InterruptedException ie) {
               if (!shouldStop) {
                 throw new RuntimeException(ie);
@@ -871,9 +912,9 @@ public class HLogSplitter {
     }
 
     void finish() {
-      synchronized (dataAvailable) {
+      synchronized (controller.dataAvailable) {
         shouldStop = true;
-        dataAvailable.notifyAll();
+        controller.dataAvailable.notifyAll();
       }
     }
   }
@@ -882,7 +923,10 @@ public class HLogSplitter {
    * The following class is an abstraction class to provide a common interface 
to support both
    * existing recovered edits file sink and region server WAL edits replay sink
    */
-   abstract class OutputSink {
+  public static abstract class OutputSink {
+
+    protected PipelineController controller;
+    protected EntryBuffers entryBuffers;
 
     protected Map<byte[], SinkWriter> writers = Collections
         .synchronizedMap(new TreeMap<byte[], 
SinkWriter>(Bytes.BYTES_COMPARATOR));;
@@ -908,8 +952,10 @@ public class HLogSplitter {
 
     protected List<Path> splits = null;
 
-    public OutputSink(int numWriters) {
+    public OutputSink(PipelineController controller, EntryBuffers 
entryBuffers, int numWriters) {
       numThreads = numWriters;
+      this.controller = controller;
+      this.entryBuffers = entryBuffers;
     }
 
     void setReporter(CancelableProgressable reporter) {
@@ -919,9 +965,9 @@ public class HLogSplitter {
     /**
      * Start the threads that will pump data from the entryBuffers to the 
output files.
      */
-    synchronized void startWriterThreads() {
+    public synchronized void startWriterThreads() {
       for (int i = 0; i < numThreads; i++) {
-        WriterThread t = new WriterThread(this, i);
+        WriterThread t = new WriterThread(controller, entryBuffers, this, i);
         t.start();
         writerThreads.add(t);
       }
@@ -980,34 +1026,34 @@ public class HLogSplitter {
           throw iie;
         }
       }
-      checkForErrors();
+      controller.checkForErrors();
       LOG.info("Split writers finished");
       return (!progress_failed);
     }
 
-    abstract List<Path> finishWritingAndClose() throws IOException;
+    public abstract List<Path> finishWritingAndClose() throws IOException;
 
     /**
      * @return a map from encoded region ID to the number of edits written out 
for that region.
      */
-    abstract Map<byte[], Long> getOutputCounts();
+    public abstract Map<byte[], Long> getOutputCounts();
 
     /**
      * @return number of regions we've recovered
      */
-    abstract int getNumberOfRecoveredRegions();
+    public abstract int getNumberOfRecoveredRegions();
 
     /**
      * @param buffer A WAL Edit Entry
      * @throws IOException
      */
-    abstract void append(RegionEntryBuffer buffer) throws IOException;
+    public abstract void append(RegionEntryBuffer buffer) throws IOException;
 
     /**
      * WriterThread call this function to help flush internal remaining edits 
in buffer before close
      * @return true when underlying sink has something to flush
      */
-    protected boolean flush() throws IOException {
+    public boolean flush() throws IOException {
       return false;
     }
   }
@@ -1017,13 +1063,14 @@ public class HLogSplitter {
    */
   class LogRecoveredEditsOutputSink extends OutputSink {
 
-    public LogRecoveredEditsOutputSink(int numWriters) {
+    public LogRecoveredEditsOutputSink(PipelineController controller, 
EntryBuffers entryBuffers,
+        int numWriters) {
       // More threads could potentially write faster at the expense
       // of causing more disk seeks as the logs are split.
       // 3. After a certain setting (probably around 3) the
       // process will be bound on the reader in the current
       // implementation anyway.
-      super(numWriters);
+      super(controller, entryBuffers, numWriters);
     }
 
     /**
@@ -1031,7 +1078,7 @@ public class HLogSplitter {
      * @throws IOException
      */
     @Override
-    List<Path> finishWritingAndClose() throws IOException {
+    public List<Path> finishWritingAndClose() throws IOException {
       boolean isSuccessful = false;
       List<Path> result = null;
       try {
@@ -1247,7 +1294,7 @@ public class HLogSplitter {
     }
 
     @Override
-    void append(RegionEntryBuffer buffer) throws IOException {
+    public void append(RegionEntryBuffer buffer) throws IOException {
       List<Entry> entries = buffer.entryBuffer;
       if (entries.isEmpty()) {
         LOG.warn("got an empty buffer, skipping");
@@ -1287,7 +1334,7 @@ public class HLogSplitter {
      * @return a map from encoded region ID to the number of edits written out 
for that region.
      */
     @Override
-    Map<byte[], Long> getOutputCounts() {
+    public Map<byte[], Long> getOutputCounts() {
       TreeMap<byte[], Long> ret = new TreeMap<byte[], 
Long>(Bytes.BYTES_COMPARATOR);
       synchronized (writers) {
         for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
@@ -1298,7 +1345,7 @@ public class HLogSplitter {
     }
 
     @Override
-    int getNumberOfRecoveredRegions() {
+    public int getNumberOfRecoveredRegions() {
       return writers.size();
     }
   }
@@ -1306,7 +1353,7 @@ public class HLogSplitter {
   /**
    * Class wraps the actual writer which writes data out and related statistics
    */
-  private abstract static class SinkWriter {
+  public abstract static class SinkWriter {
     /* Count of edits written to this path */
     long editsWritten = 0;
     /* Number of nanos spent writing to this log */
@@ -1367,16 +1414,18 @@ public class HLogSplitter {
     private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
     private boolean hasEditsInDisablingOrDisabledTables = false;
 
-    public LogReplayOutputSink(int numWriters) {
-      super(numWriters);
+    public LogReplayOutputSink(PipelineController controller, EntryBuffers 
entryBuffers,
+        int numWriters) {
+      super(controller, entryBuffers, numWriters);
       this.waitRegionOnlineTimeOut = 
conf.getInt("hbase.splitlog.manager.timeout",
         SplitLogManager.DEFAULT_TIMEOUT);
-      this.logRecoveredEditsOutputSink = new 
LogRecoveredEditsOutputSink(numWriters);
+      this.logRecoveredEditsOutputSink = new 
LogRecoveredEditsOutputSink(controller,
+        entryBuffers, numWriters);
       this.logRecoveredEditsOutputSink.setReporter(reporter);
     }
 
     @Override
-    void append(RegionEntryBuffer buffer) throws IOException {
+    public void append(RegionEntryBuffer buffer) throws IOException {
       List<Entry> entries = buffer.entryBuffer;
       if (entries.isEmpty()) {
         LOG.warn("got an empty buffer, skipping");
@@ -1692,7 +1741,7 @@ public class HLogSplitter {
     }
 
     @Override
-    protected boolean flush() throws IOException {
+    public boolean flush() throws IOException {
       String curLoc = null;
       int curSize = 0;
       List<Pair<HRegionLocation, HLog.Entry>> curQueue = null;
@@ -1712,7 +1761,7 @@ public class HLogSplitter {
 
       if (curSize > 0) {
         this.processWorkItems(curLoc, curQueue);
-        dataAvailable.notifyAll();
+        controller.dataAvailable.notifyAll();
         return true;
       }
       return false;
@@ -1723,7 +1772,7 @@ public class HLogSplitter {
     }
 
     @Override
-    List<Path> finishWritingAndClose() throws IOException {
+    public List<Path> finishWritingAndClose() throws IOException {
       try {
         if (!finishWriting()) {
           return null;
@@ -1798,7 +1847,7 @@ public class HLogSplitter {
     }
 
     @Override
-    Map<byte[], Long> getOutputCounts() {
+    public Map<byte[], Long> getOutputCounts() {
       TreeMap<byte[], Long> ret = new TreeMap<byte[], 
Long>(Bytes.BYTES_COMPARATOR);
       synchronized (writers) {
         for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) 
{
@@ -1809,7 +1858,7 @@ public class HLogSplitter {
     }
 
     @Override
-    int getNumberOfRecoveredRegions() {
+    public int getNumberOfRecoveredRegions() {
       return this.recoveredRegions.size();
     }
 
@@ -1945,7 +1994,8 @@ public class HLogSplitter {
    * @throws IOException
    */
   public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, 
CellScanner cells,
-      Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag) throws 
IOException {
+      Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag, Durability 
durability)
+          throws IOException {
 
     if (entry == null) {
       // return an empty array
@@ -1998,6 +2048,9 @@ public class HLogSplitter {
         }
         ((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell));
       }
+      if (m != null) {
+        m.setDurability(durability);
+      }
       previousCell = cell;
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
new file mode 100644
index 0000000..4a25c44
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -0,0 +1,558 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.RetryingCallable;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import 
org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.PipelineController;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.SinkWriter;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.OutputSink;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.protobuf.ServiceException;
+
+/**
+ * A {@link ReplicationEndpoint} endpoint which receives the WAL edits from the
+ * WAL, and sends the edits to replicas of regions.
+ */
+@InterfaceAudience.Private
+public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint 
{
+
+  private static final Log LOG = 
LogFactory.getLog(RegionReplicaReplicationEndpoint.class);
+
+  private Configuration conf;
+  private ClusterConnection connection;
+
+  // Reuse HLogSplitter constructs as a WAL pipe
+  private PipelineController controller;
+  private RegionReplicaOutputSink outputSink;
+  private EntryBuffers entryBuffers;
+
+  // Number of writer threads
+  private int numWriterThreads;
+
+  private int operationTimeout;
+
+  private ExecutorService pool;
+
+  @Override
+  public void init(Context context) throws IOException {
+    super.init(context);
+
+    this.conf = HBaseConfiguration.create(context.getConfiguration());
+
+    String codecClassName = conf
+        .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, 
WALCellCodec.class.getName());
+    conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
+
+    this.numWriterThreads = this.conf.getInt(
+      "hbase.region.replica.replication.writer.threads", 3);
+    controller = new PipelineController();
+    entryBuffers = new EntryBuffers(controller,
+      this.conf.getInt("hbase.region.replica.replication.buffersize",
+          128*1024*1024));
+
+    // use the regular RPC timeout for replica replication RPC's
+    this.operationTimeout = 
conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+      HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+  }
+
+  @Override
+  protected void doStart() {
+    try {
+      connection = (ClusterConnection) 
HConnectionManager.createConnection(ctx.getConfiguration());
+      this.pool = getDefaultThreadPool(conf);
+      outputSink = new RegionReplicaOutputSink(controller, entryBuffers, 
connection, pool,
+        numWriterThreads, operationTimeout);
+      outputSink.startWriterThreads();
+      super.doStart();
+    } catch (IOException ex) {
+      LOG.warn("Received exception while creating connection :" + ex);
+      notifyFailed(ex);
+    }
+  }
+
+  @Override
+  protected void doStop() {
+    if (outputSink != null) {
+      try {
+        outputSink.finishWritingAndClose();
+      } catch (IOException ex) {
+        LOG.warn("Got exception while trying to close OutputSink");
+        LOG.warn(ex);
+      }
+    }
+    if (this.pool != null) {
+      this.pool.shutdownNow();
+      try {
+        // wait for 10 sec
+        boolean shutdown = this.pool.awaitTermination(10000, 
TimeUnit.MILLISECONDS);
+        if (!shutdown) {
+          LOG.warn("Failed to shutdown the thread pool after 10 seconds");
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Got interrupted while waiting for the thread pool to shut 
down" + e);
+      }
+    }
+    if (connection != null) {
+      try {
+        connection.close();
+      } catch (IOException ex) {
+        LOG.warn("Got exception closing connection :" + ex);
+      }
+    }
+    super.doStop();
+  }
+
+  /**
+   * Returns a Thread pool for the RPC's to region replicas. Similar to
+   * Connection's thread pool.
+   */
+  private ExecutorService getDefaultThreadPool(Configuration conf) {
+    int maxThreads = 
conf.getInt("hbase.region.replica.replication.threads.max", 256);
+    int coreThreads = 
conf.getInt("hbase.region.replica.replication.threads.core", 16);
+    if (maxThreads == 0) {
+      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+    }
+    if (coreThreads == 0) {
+      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
+    }
+    long keepAliveTime = 
conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
+    LinkedBlockingQueue<Runnable> workQueue =
+        new LinkedBlockingQueue<Runnable>(maxThreads *
+            conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
+              HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
+    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
+      coreThreads,
+      maxThreads,
+      keepAliveTime,
+      TimeUnit.SECONDS,
+      workQueue,
+      Threads.newDaemonThreadFactory(this.getClass().toString() + 
"-rpc-shared-"));
+    tpe.allowCoreThreadTimeOut(true);
+    return tpe;
+  }
+
+  @Override
+  public boolean replicate(ReplicateContext replicateContext) {
+    /* A note on batching in RegionReplicaReplicationEndpoint (RRRE):
+     *
+     * RRRE relies on batching from two different mechanisms. The first is the 
batching from
+     * ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS 
reads from a single
+     * WAL file filling up a buffer of heap size 
"replication.source.size.capacity"(64MB) or at most
+     * "replication.source.nb.capacity" entries or until it sees the end of 
file (in live tailing).
+     * Then RS passes all the buffered edits in this replicate() call context. 
RRRE puts the edits
+     * to the HLogSplitter.EntryBuffers which is a blocking buffer space of up 
to
+     * "hbase.region.replica.replication.buffersize" (128MB) in size. This 
buffer splits the edits
+     * based on regions.
+     *
+     * There are "hbase.region.replica.replication.writer.threads"(default 3) 
writer threads which
+     * pick largest per-region buffer and send it to the SinkWriter (see 
RegionReplicaOutputSink).
+     * The SinkWriter in this case will send the wal edits to all secondary 
region replicas in
+     * parallel via a retrying rpc call. EntryBuffers guarantees that while a 
buffer is
+     * being written to the sink, another buffer for the same region will not 
be made available to
+     * writers ensuring regions edits are not replayed out of order.
+     *
+     * The replicate() call won't return until all the buffers are sent and 
ack'd by the sinks so
+     * that the replication can assume all edits are persisted. We may be able 
to do a better
+     * pipelining between the replication thread and output sinks later if it 
becomes a bottleneck.
+     */
+
+    while (this.isRunning()) {
+      try {
+        for (Entry entry: replicateContext.getEntries()) {
+          entryBuffers.appendEntry(entry);
+        }
+        outputSink.flush(); // make sure everything is flushed
+        return true;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return false;
+      } catch (IOException e) {
+        LOG.warn("Received IOException while trying to replicate"
+            + StringUtils.stringifyException(e));
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean canReplicateToSameCluster() {
+    return true;
+  }
+
+  @Override
+  protected WALEntryFilter getScopeWALEntryFilter() {
+    // we do not care about scope. We replicate everything.
+    return null;
+  }
+
+  static class RegionReplicaOutputSink extends OutputSink {
+    private RegionReplicaSinkWriter sinkWriter;
+
+    public RegionReplicaOutputSink(PipelineController controller, EntryBuffers 
entryBuffers,
+        ClusterConnection connection, ExecutorService pool, int numWriters, 
int operationTimeout) {
+      super(controller, entryBuffers, numWriters);
+      this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, 
operationTimeout);
+    }
+
+    @Override
+    public void append(RegionEntryBuffer buffer) throws IOException {
+      List<Entry> entries = buffer.getEntryBuffer();
+
+      if (entries.isEmpty() || 
entries.get(0).getEdit().getKeyValues().isEmpty()) {
+        return;
+      }
+
+      sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
+        entries.get(0).getEdit().getKeyValues().get(0).getRow(), entries);
+    }
+
+    @Override
+    public boolean flush() throws IOException {
+      // nothing much to do for now. Wait for the Writer threads to finish up
+      // append()'ing the data.
+      entryBuffers.waitUntilDrained();
+      return super.flush();
+    }
+
+    @Override
+    public List<Path> finishWritingAndClose() throws IOException {
+      finishWriting();
+      return null;
+    }
+
+    @Override
+    public Map<byte[], Long> getOutputCounts() {
+      return null; // only used in tests
+    }
+
+    @Override
+    public int getNumberOfRecoveredRegions() {
+      return 0;
+    }
+
+    AtomicLong getSkippedEditsCounter() {
+      return skippedEdits;
+    }
+  }
+
+  static class RegionReplicaSinkWriter extends SinkWriter {
+    RegionReplicaOutputSink sink;
+    ClusterConnection connection;
+    RpcControllerFactory rpcControllerFactory;
+    RpcRetryingCallerFactory rpcRetryingCallerFactory;
+    int operationTimeout;
+    ExecutorService pool;
+    Cache<TableName, Boolean> disabledAndDroppedTables;
+
+    public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, 
ClusterConnection connection,
+        ExecutorService pool, int operationTimeout) {
+      this.sink = sink;
+      this.connection = connection;
+      this.operationTimeout = operationTimeout;
+      this.rpcRetryingCallerFactory
+        = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
+      this.rpcControllerFactory = 
RpcControllerFactory.instantiate(connection.getConfiguration());
+      this.pool = pool;
+
+      int nonExistentTableCacheExpiryMs = connection.getConfiguration()
+        
.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs",
 5000);
+      // A cache for non existing tables that have a default expiry of 5 sec. 
This means that if the
+      // table is created again with the same name, we might miss to replicate 
for that amount of
+      // time. But this cache prevents overloading meta requests for every 
edit from a deleted file.
+      disabledAndDroppedTables = CacheBuilder.newBuilder()
+        .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS)
+        .initialCapacity(10)
+        .maximumSize(1000)
+        .build();
+    }
+
+    public void append(TableName tableName, byte[] encodedRegionName, byte[] 
row,
+        List<Entry> entries) throws IOException {
+
+      if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
+        sink.getSkippedEditsCounter().incrementAndGet();
+        return;
+      }
+
+      // get the replicas of the primary region
+      RegionLocations locations = null;
+      try {
+        locations = getRegionLocations(connection, tableName, row, true, 0);
+
+        if (locations == null) {
+          throw new HBaseIOException("Cannot locate locations for "
+              + tableName + ", row:" + Bytes.toStringBinary(row));
+        }
+      } catch (TableNotFoundException e) {
+        disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to 
cache. Value ignored
+        // skip this entry
+        sink.getSkippedEditsCounter().addAndGet(entries.size());
+        return;
+      }
+
+      if (locations.size() == 1) {
+        return;
+      }
+
+      ArrayList<Future<ReplicateWALEntryResponse>> tasks
+        = new ArrayList<Future<ReplicateWALEntryResponse>>(2);
+
+      // check whether we should still replay this entry. If the regions are 
changed, or the
+      // entry is not coming form the primary region, filter it out.
+      HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
+      if 
(!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
+        encodedRegionName)) {
+        sink.getSkippedEditsCounter().addAndGet(entries.size());
+        return;
+      }
+
+
+      // All passed entries should belong to one region because it is coming 
from the EntryBuffers
+      // split per region. But the regions might split and merge (unlike log 
recovery case).
+      for (int replicaId = 0; replicaId < locations.size(); replicaId++) {
+        HRegionLocation location = locations.getRegionLocation(replicaId);
+        if (!RegionReplicaUtil.isDefaultReplica(replicaId)) {
+          HRegionInfo regionInfo = location == null
+              ? RegionReplicaUtil.getRegionInfoForReplica(
+                locations.getDefaultRegionLocation().getRegionInfo(), 
replicaId)
+              : location.getRegionInfo();
+          RegionReplicaReplayCallable callable = new 
RegionReplicaReplayCallable(connection,
+            rpcControllerFactory, tableName, location, regionInfo, row, 
entries,
+            sink.getSkippedEditsCounter());
+           Future<ReplicateWALEntryResponse> task = pool.submit(
+             new 
RetryingRpcCallable<ReplicateWALEntryResponse>(rpcRetryingCallerFactory,
+                 callable, operationTimeout));
+           tasks.add(task);
+        }
+      }
+
+      boolean tasksCancelled = false;
+      for (Future<ReplicateWALEntryResponse> task : tasks) {
+        try {
+          task.get();
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException(e.getMessage());
+        } catch (ExecutionException e) {
+          Throwable cause = e.getCause();
+          if (cause instanceof IOException) {
+            // The table can be disabled or dropped at this time. For disabled 
tables, we have no
+            // cheap mechanism to detect this case because meta does not 
contain this information.
+            // HConnection.isTableDisabled() is a zk call which we cannot do 
for every replay RPC.
+            // So instead we start the replay RPC with retries and
+            // check whether the table is dropped or disabled which might cause
+            // SocketTimeoutException, or RetriesExhaustedException or similar 
if we get IOE.
+            if (cause instanceof TableNotFoundException || 
connection.isTableDisabled(tableName)) {
+              disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to 
cache for later.
+              if (!tasksCancelled) {
+                sink.getSkippedEditsCounter().addAndGet(entries.size());
+                tasksCancelled = true; // so that we do not add to skipped 
counter again
+              }
+              continue;
+            }
+            // otherwise rethrow
+            throw (IOException)cause;
+          }
+          // unexpected exception
+          throw new IOException(cause);
+        }
+      }
+    }
+  }
+
+  static class RetryingRpcCallable<V> implements Callable<V> {
+    RpcRetryingCallerFactory factory;
+    RetryingCallable<V> callable;
+    int timeout;
+    public RetryingRpcCallable(RpcRetryingCallerFactory factory, 
RetryingCallable<V> callable,
+        int timeout) {
+      this.factory = factory;
+      this.callable = callable;
+      this.timeout = timeout;
+    }
+    @Override
+    public V call() throws Exception {
+      return factory.<V>newCaller().callWithRetries(callable, timeout);
+    }
+  }
+
+  /**
+   * Calls replay on the passed edits for the given set of entries belonging 
to the region. It skips
+   * the entry if the region boundaries have changed or the region is gone.
+   */
+  static class RegionReplicaReplayCallable
+    extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
+    // replicaId of the region replica that we want to replicate to
+    private final int replicaId;
+
+    private final List<HLog.Entry> entries;
+    private final byte[] initialEncodedRegionName;
+    private final AtomicLong skippedEntries;
+    private final RpcControllerFactory rpcControllerFactory;
+    private boolean skip;
+
+    public RegionReplicaReplayCallable(ClusterConnection connection,
+        RpcControllerFactory rpcControllerFactory, TableName tableName,
+        HRegionLocation location, HRegionInfo regionInfo, byte[] 
row,List<HLog.Entry> entries,
+        AtomicLong skippedEntries) {
+      super(connection, location, tableName, row);
+      this.replicaId = regionInfo.getReplicaId();
+      this.entries = entries;
+      this.rpcControllerFactory = rpcControllerFactory;
+      this.skippedEntries = skippedEntries;
+      this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
+    }
+
+    @Override
+    public HRegionLocation getLocation(boolean useCache) throws IOException {
+      RegionLocations rl = getRegionLocations(connection, tableName, row, 
useCache, replicaId);
+      if (rl == null) {
+        throw new HBaseIOException(getExceptionMessage());
+      }
+      location = rl.getRegionLocation(replicaId);
+      if (location == null) {
+        throw new HBaseIOException(getExceptionMessage());
+      }
+
+      // check whether we should still replay this entry. If the regions are 
changed, or the
+      // entry is not coming form the primary region, filter it out because we 
do not need it.
+      // Regions can change because of (1) region split (2) region merge (3) 
table recreated
+      if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
+        initialEncodedRegionName)) {
+        skip = true;
+        return null;
+      }
+
+      return location;
+    }
+
+    @Override
+    public ReplicateWALEntryResponse call(int timeout) throws IOException {
+      return replayToServer(this.entries, timeout);
+    }
+
+    private ReplicateWALEntryResponse replayToServer(List<HLog.Entry> entries, 
int timeout)
+        throws IOException {
+      if (entries.isEmpty() || skip) {
+        skippedEntries.incrementAndGet();
+        return ReplicateWALEntryResponse.newBuilder().build();
+      }
+
+      HLog.Entry[] entriesArray = new HLog.Entry[entries.size()];
+      entriesArray = entries.toArray(entriesArray);
+
+      // set the region name for the target region replica
+      Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
+          ReplicationProtbufUtil.buildReplicateWALEntryRequest(
+            entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
+      try {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController(p.getSecond());
+        controller.setCallTimeout(timeout);
+        controller.setPriority(tableName);
+        return stub.replay(controller, p.getFirst());
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
+    }
+
+    @Override
+    protected String getExceptionMessage() {
+      return super.getExceptionMessage() + " table=" + tableName
+          + " ,replica=" + replicaId + ", row=" + Bytes.toStringBinary(row);
+    }
+  }
+
+  private static RegionLocations getRegionLocations(
+      ClusterConnection connection, TableName tableName, byte[] row,
+      boolean useCache, int replicaId)
+      throws RetriesExhaustedException, DoNotRetryIOException, 
InterruptedIOException {
+    RegionLocations rl;
+    try {
+      rl = connection.locateRegion(tableName, row, useCache, true, replicaId);
+    } catch (DoNotRetryIOException e) {
+      throw e;
+    } catch (RetriesExhaustedException e) {
+      throw e;
+    } catch (InterruptedIOException e) {
+      throw e;
+    } catch (IOException e) {
+      throw new RetriesExhaustedException("Can't get the location", e);
+    }
+    if (rl == null) {
+      throw new RetriesExhaustedException("Can't get the locations");
+    }
+
+    return rl;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 87cbcc6..22c80be 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -706,7 +706,8 @@ public class ReplicationSource extends Thread
         }
         break;
       } catch (Exception ex) {
-        LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown 
exception:" + ex);
+        LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown 
exception:" +
+            org.apache.hadoop.util.StringUtils.stringifyException(ex));
         if (sleepForRetries("ReplicationEndpoint threw exception", 
sleepMultiplier)) {
           sleepMultiplier++;
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 74a74a1..199f45e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -25,10 +25,15 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import 
org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 
 /**
  * Similar to {@link RegionReplicaUtil} but for the server side
@@ -36,6 +41,21 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 public class ServerRegionReplicaUtil extends RegionReplicaUtil {
 
   /**
+   * Whether asynchronous WAL replication to the secondary region replicas is 
enabled or not.
+   * If this is enabled, a replication peer named "region_replica_replication" 
will be created
+   * which will tail the logs and replicate the mutatations to region replicas 
for tables that
+   * have region replication > 1. If this is enabled once, disabling this 
replication also
+   * requires disabling the replication peer using shell or ReplicationAdmin 
java class.
+   * Replication to secondary region replicas works over standard 
inter-cluster replication.·
+   * So replication, if disabled explicitly, also has to be enabled by setting 
"hbase.replication"·
+   * to true for this feature to work.
+   */
+  public static final String REGION_REPLICA_REPLICATION_CONF_KEY
+    = "hbase.region.replica.replication.enabled";
+  private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false;
+  private static final String REGION_REPLICA_REPLICATION_PEER = 
"region_replica_replication";
+
+  /**
    * Returns the regionInfo object to use for interacting with the file system.
    * @return An HRegionInfo object to interact with the filesystem
    */
@@ -96,4 +116,35 @@ public class ServerRegionReplicaUtil extends 
RegionReplicaUtil {
     return new StoreFileInfo(conf, fs, status, link);
   }
 
+  /**
+   * Create replication peer for replicating to region replicas if needed.
+   * @param conf configuration to use
+   * @throws IOException
+   */
+  public static void setupRegionReplicaReplication(Configuration conf) throws 
IOException {
+    if (!conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY, 
DEFAULT_REGION_REPLICA_REPLICATION)) {
+      return;
+    }
+    ReplicationAdmin repAdmin = new ReplicationAdmin(conf);
+    try {
+      if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) {
+        ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
+        peerConfig.setClusterKey(ZKUtil.getZooKeeperClusterKey(conf));
+        
peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
+        repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null);
+      }
+    } catch (ReplicationException ex) {
+      throw new IOException(ex);
+    } finally {
+      repAdmin.close();
+    }
+  }
+
+  /**
+   * Return the peer id used for replicating to secondary region replicas
+   */
+  public static String getReplicationPeerId() {
+    return REGION_REPLICA_REPLICATION_PEER;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 7aeb778..bedd480 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -62,6 +63,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -1532,6 +1534,18 @@ public class HBaseTestingUtility extends 
HBaseCommonTestingUtility {
     getHBaseAdmin().deleteTable(tableName);
   }
 
+  /**
+   * Drop an existing table
+   * @param tableName existing table
+   */
+  public void deleteTableIfAny(TableName tableName) throws IOException {
+    try {
+      deleteTable(tableName);
+    } catch (TableNotFoundException e) {
+      // ignore
+    }
+  }
+
   // ==========================================================================
   // Canned table and table descriptor creation
   // TODO replace HBaseTestCase
@@ -1846,7 +1860,8 @@ public class HBaseTestingUtility extends 
HBaseCommonTestingUtility {
     return rowCount;
   }
 
-  public void loadNumericRows(final HTable t, final byte[] f, int startRow, 
int endRow) throws IOException {
+  public void loadNumericRows(final HTableInterface t, final byte[] f, int 
startRow, int endRow)
+      throws IOException {
     for (int i = startRow; i < endRow; i++) {
       byte[] data = Bytes.toBytes(String.valueOf(i));
       Put put = new Put(data);
@@ -1855,7 +1870,23 @@ public class HBaseTestingUtility extends 
HBaseCommonTestingUtility {
     }
   }
 
-  public void deleteNumericRows(final HTable t, final byte[] f, int startRow, 
int endRow) throws IOException {
+  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, 
int endRow)
+      throws IOException {
+    for (int i = startRow; i < endRow; i++) {
+      String failMsg = "Failed verification of row :" + i;
+      byte[] data = Bytes.toBytes(String.valueOf(i));
+      Result result = region.get(new Get(data));
+      assertTrue(failMsg, result.containsColumn(f, null));
+      assertEquals(failMsg, result.getColumnCells(f, null).size(), 1);
+      Cell cell = result.getColumnLatestCell(f, null);
+      assertTrue(failMsg,
+        Bytes.equals(data, 0, data.length, cell.getValueArray(), 
cell.getValueOffset(),
+          cell.getValueLength()));
+    }
+  }
+
+  public void deleteNumericRows(final HTable t, final byte[] f, int startRow, 
int endRow)
+      throws IOException {
     for (int i = startRow; i < endRow; i++) {
       byte[] data = Bytes.toBytes(String.valueOf(i));
       Delete delete = new Delete(data);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
index d45cda3..dd90136 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*;
 import java.io.IOException;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
@@ -32,7 +33,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TestMetaTableAccessor;
 import org.apache.hadoop.hbase.client.Consistency;
@@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
@@ -102,53 +101,9 @@ public class TestRegionReplicas {
     return HTU.getMiniHBaseCluster().getRegionServer(0);
   }
 
-  private void openRegion(HRegionInfo hri) throws Exception {
-    // first version is '0'
-    AdminProtos.OpenRegionRequest orr = 
RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null, 
null);
-    AdminProtos.OpenRegionResponse responseOpen = 
getRS().getRSRpcServices().openRegion(null, orr);
-    Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
-    Assert.assertTrue(responseOpen.getOpeningState(0).
-        equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED));
-    checkRegionIsOpened(hri.getEncodedName());
-  }
-
-  private void closeRegion(HRegionInfo hri) throws Exception {
-    AdminProtos.CloseRegionRequest crr = 
RequestConverter.buildCloseRegionRequest(getRS().getServerName(),
-        hri.getEncodedName());
-    AdminProtos.CloseRegionResponse responseClose = 
getRS().getRSRpcServices().closeRegion(null, crr);
-    Assert.assertTrue(responseClose.getClosed());
-
-    checkRegionIsClosed(hri.getEncodedName());
-  }
-
-  private void checkRegionIsOpened(String encodedRegionName) throws Exception {
-
-    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
-      Thread.sleep(1);
-    }
-
-    
Assert.assertTrue(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
-  }
-
-
-  private void checkRegionIsClosed(String encodedRegionName) throws Exception {
-
-    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
-      Thread.sleep(1);
-    }
-
-    try {
-      
Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
-    } catch (NotServingRegionException expected) {
-      // That's how it work: if the region is closed we have an exception.
-    }
-
-    // We don't delete the znode here, because there is not always a znode.
-  }
-
   @Test(timeout = 60000)
   public void testOpenRegionReplica() throws Exception {
-    openRegion(hriSecondary);
+    openRegion(HTU, getRS(), hriSecondary);
     try {
       //load some data to primary
       HTU.loadNumericRows(table, f, 0, 1000);
@@ -157,14 +112,14 @@ public class TestRegionReplicas {
       Assert.assertEquals(1000, HTU.countRows(table));
     } finally {
       HTU.deleteNumericRows(table, f, 0, 1000);
-      closeRegion(hriSecondary);
+      closeRegion(HTU, getRS(), hriSecondary);
     }
   }
 
   /** Tests that the meta location is saved for secondary regions */
   @Test(timeout = 60000)
   public void testRegionReplicaUpdatesMetaLocation() throws Exception {
-    openRegion(hriSecondary);
+    openRegion(HTU, getRS(), hriSecondary);
     HTable meta = null;
     try {
       meta = new HTable(HTU.getConfiguration(), TableName.META_TABLE_NAME);
@@ -172,7 +127,7 @@ public class TestRegionReplicas {
         , getRS().getServerName(), -1, 1, false);
     } finally {
       if (meta != null ) meta.close();
-      closeRegion(hriSecondary);
+      closeRegion(HTU, getRS(), hriSecondary);
     }
   }
 
@@ -186,7 +141,7 @@ public class TestRegionReplicas {
       // flush so that region replica can read
       getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache();
 
-      openRegion(hriSecondary);
+      openRegion(HTU, getRS(), hriSecondary);
 
       // first try directly against region
       HRegion region = 
getRS().getFromOnlineRegions(hriSecondary.getEncodedName());
@@ -195,7 +150,7 @@ public class TestRegionReplicas {
       assertGetRpc(hriSecondary, 42, true);
     } finally {
       HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
-      closeRegion(hriSecondary);
+      closeRegion(HTU, getRS(), hriSecondary);
     }
   }
 
@@ -209,7 +164,7 @@ public class TestRegionReplicas {
       // flush so that region replica can read
       getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache();
 
-      openRegion(hriSecondary);
+      openRegion(HTU, getRS(), hriSecondary);
 
       // try directly Get against region replica
       byte[] row = Bytes.toBytes(String.valueOf(42));
@@ -220,7 +175,7 @@ public class TestRegionReplicas {
       Assert.assertArrayEquals(row, result.getValue(f, null));
     } finally {
       HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
-      closeRegion(hriSecondary);
+      closeRegion(HTU, getRS(), hriSecondary);
     }
   }
 
@@ -236,7 +191,8 @@ public class TestRegionReplicas {
   }
 
   // build a mock rpc
-  private void assertGetRpc(HRegionInfo info, int value, boolean expect) 
throws IOException, ServiceException {
+  private void assertGetRpc(HRegionInfo info, int value, boolean expect)
+      throws IOException, ServiceException {
     byte[] row = Bytes.toBytes(String.valueOf(value));
     Get get = new Get(row);
     ClientProtos.GetRequest getReq = 
RequestConverter.buildGetRequest(info.getRegionName(), get);
@@ -259,13 +215,14 @@ public class TestRegionReplicas {
     // enable store file refreshing
     final int refreshPeriod = 2000; // 2 sec
     HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100);
-    
HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
 refreshPeriod);
+    
HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
+      refreshPeriod);
     // restart the region server so that it starts the refresher chore
     restartRegionServer();
 
     try {
       LOG.info("Opening the secondary region " + 
hriSecondary.getEncodedName());
-      openRegion(hriSecondary);
+      openRegion(HTU, getRS(), hriSecondary);
 
       //load some data to primary
       LOG.info("Loading data to primary region");
@@ -321,7 +278,7 @@ public class TestRegionReplicas {
 
     } finally {
       HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
-      closeRegion(hriSecondary);
+      closeRegion(HTU, getRS(), hriSecondary);
     }
   }
 
@@ -338,7 +295,7 @@ public class TestRegionReplicas {
     final int startKey = 0, endKey = 1000;
 
     try {
-      openRegion(hriSecondary);
+      openRegion(HTU, getRS(), hriSecondary);
 
       //load some data to primary so that reader won't fail
       HTU.loadNumericRows(table, f, startKey, endKey);
@@ -402,13 +359,13 @@ public class TestRegionReplicas {
               // whether to do a close and open
               if (random.nextInt(10) == 0) {
                 try {
-                  closeRegion(hriSecondary);
+                  closeRegion(HTU, getRS(), hriSecondary);
                 } catch (Exception ex) {
                   LOG.warn("Failed closing the region " + hriSecondary + " "  
+ StringUtils.stringifyException(ex));
                   exceptions[2].compareAndSet(null, ex);
                 }
                 try {
-                  openRegion(hriSecondary);
+                  openRegion(HTU, getRS(), hriSecondary);
                 } catch (Exception ex) {
                   LOG.warn("Failed opening the region " + hriSecondary + " "  
+ StringUtils.stringifyException(ex));
                   exceptions[2].compareAndSet(null, ex);
@@ -443,7 +400,7 @@ public class TestRegionReplicas {
 
     } finally {
       HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, 
endKey);
-      closeRegion(hriSecondary);
+      closeRegion(HTU, getRS(), hriSecondary);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index 3164f1d..ed5dfbf 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -147,49 +147,55 @@ public class TestRegionServerNoMaster {
   }
 
 
-  /**
-   * Reopen the region. Reused in multiple tests as we always leave the region 
open after a test.
-   */
-  private void reopenRegion() throws Exception {
+  public static void openRegion(HBaseTestingUtility HTU, HRegionServer rs, 
HRegionInfo hri)
+      throws Exception {
     AdminProtos.OpenRegionRequest orr =
-      RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 
null, null);
-    AdminProtos.OpenRegionResponse responseOpen = 
getRS().rpcServices.openRegion(null, orr);
+      RequestConverter.buildOpenRegionRequest(rs.getServerName(), hri, null, 
null);
+    AdminProtos.OpenRegionResponse responseOpen = 
rs.rpcServices.openRegion(null, orr);
+
     Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
     Assert.assertTrue(responseOpen.getOpeningState(0).
         equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED));
 
 
-    checkRegionIsOpened();
+    checkRegionIsOpened(HTU, rs, hri);
   }
 
-  private void checkRegionIsOpened() throws Exception {
-
-    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
+  public static void checkRegionIsOpened(HBaseTestingUtility HTU, 
HRegionServer rs,
+      HRegionInfo hri) throws Exception {
+    while (!rs.getRegionsInTransitionInRS().isEmpty()) {
       Thread.sleep(1);
     }
 
-    Assert.assertTrue(getRS().getRegion(regionName).isAvailable());
+    Assert.assertTrue(rs.getRegion(hri.getRegionName()).isAvailable());
   }
 
+  public static void closeRegion(HBaseTestingUtility HTU, HRegionServer rs, 
HRegionInfo hri)
+      throws Exception {
+    AdminProtos.CloseRegionRequest crr = 
RequestConverter.buildCloseRegionRequest(
+      rs.getServerName(), hri.getEncodedName());
+    AdminProtos.CloseRegionResponse responseClose = 
rs.rpcServices.closeRegion(null, crr);
+    Assert.assertTrue(responseClose.getClosed());
+    checkRegionIsClosed(HTU, rs, hri);
+  }
 
-  private void checkRegionIsClosed() throws Exception {
-
-    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
+  public static void checkRegionIsClosed(HBaseTestingUtility HTU, 
HRegionServer rs,
+      HRegionInfo hri) throws Exception {
+    while (!rs.getRegionsInTransitionInRS().isEmpty()) {
       Thread.sleep(1);
     }
 
     try {
-      Assert.assertFalse(getRS().getRegion(regionName).isAvailable());
+      Assert.assertFalse(rs.getRegion(hri.getRegionName()).isAvailable());
     } catch (NotServingRegionException expected) {
       // That's how it work: if the region is closed we have an exception.
     }
   }
 
-
   /**
    * Close the region without using ZK
    */
-  private void closeNoZK() throws Exception {
+  private void closeRegionNoZK() throws Exception {
     // no transition in ZK
     AdminProtos.CloseRegionRequest crr =
         RequestConverter.buildCloseRegionRequest(getRS().getServerName(), 
regionName);
@@ -197,14 +203,14 @@ public class TestRegionServerNoMaster {
     Assert.assertTrue(responseClose.getClosed());
 
     // now waiting & checking. After a while, the transition should be done 
and the region closed
-    checkRegionIsClosed();
+    checkRegionIsClosed(HTU, getRS(), hri);
   }
 
 
   @Test(timeout = 60000)
   public void testCloseByRegionServer() throws Exception {
-    closeNoZK();
-    reopenRegion();
+    closeRegionNoZK();
+    openRegion(HTU, getRS(), hri);
   }
 
   /**
@@ -221,8 +227,8 @@ public class TestRegionServerNoMaster {
   public void testMultipleOpen() throws Exception {
 
     // We close
-    closeNoZK();
-    checkRegionIsClosed();
+    closeRegionNoZK();
+    checkRegionIsClosed(HTU, getRS(), hri);
 
     // We're sending multiple requests in a row. The region server must handle 
this nicely.
     for (int i = 0; i < 10; i++) {
@@ -238,7 +244,7 @@ public class TestRegionServerNoMaster {
       );
     }
 
-    checkRegionIsOpened();
+    checkRegionIsOpened(HTU, getRS(), hri);
   }
 
   @Test
@@ -279,9 +285,9 @@ public class TestRegionServerNoMaster {
       }
     }
 
-    checkRegionIsClosed();
+    checkRegionIsClosed(HTU, getRS(), hri);
 
-    reopenRegion();
+    openRegion(HTU, getRS(), hri);
   }
 
   /**
@@ -290,8 +296,8 @@ public class TestRegionServerNoMaster {
   @Test(timeout = 60000)
   public void testCancelOpeningWithoutZK() throws Exception {
     // We close
-    closeNoZK();
-    checkRegionIsClosed();
+    closeRegionNoZK();
+    checkRegionIsClosed(HTU, getRS(), hri);
 
     // Let do the initial steps, without having a handler
     getRS().getRegionsInTransitionInRS().put(hri.getEncodedNameAsBytes(), 
Boolean.TRUE);
@@ -315,9 +321,9 @@ public class TestRegionServerNoMaster {
     getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd));
 
     // The open handler should have removed the region from RIT but kept the 
region closed
-    checkRegionIsClosed();
+    checkRegionIsClosed(HTU, getRS(), hri);
 
-    reopenRegion();
+    openRegion(HTU, getRS(), hri);
   }
 
   /**
@@ -341,7 +347,7 @@ public class TestRegionServerNoMaster {
     }
 
     //actual close
-    closeNoZK();
+    closeRegionNoZK();
     try {
       AdminProtos.OpenRegionRequest orr = 
RequestConverter.buildOpenRegionRequest(
         earlierServerName, hri, null, null);
@@ -351,7 +357,7 @@ public class TestRegionServerNoMaster {
       Assert.assertTrue(se.getCause() instanceof IOException);
       Assert.assertTrue(se.getCause().getMessage().contains("This RPC was 
intended for a different server"));
     } finally {
-      reopenRegion();
+      openRegion(HTU, getRS(), hri);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
index cdf71f6..0718f48 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
@@ -30,13 +30,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers;
+import 
org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.PipelineController;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static org.mockito.Mockito.mock;
-
 /**
  * Simple testing of a few HLog methods.
  */
@@ -45,7 +44,7 @@ public class TestHLogMethods {
   private static final byte[] TEST_REGION = Bytes.toBytes("test_region");;
   private static final TableName TEST_TABLE =
       TableName.valueOf("test_table");
-  
+
   private final HBaseTestingUtility util = new HBaseTestingUtility();
 
   /**
@@ -108,27 +107,25 @@ public class TestHLogMethods {
     reb.appendEntry(createTestLogEntry(1));
     assertTrue(reb.heapSize() > 0);
   }
-  
+
   @Test
   public void testEntrySink() throws Exception {
     Configuration conf = new Configuration();
-    RecoveryMode mode = 
(conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
+    RecoveryMode mode = 
(conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
       RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
-    HLogSplitter splitter = new HLogSplitter(
-      conf, mock(Path.class), mock(FileSystem.class), null, null, null, mode);
 
-    EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
+    EntryBuffers sink = new EntryBuffers(new PipelineController(), 
1*1024*1024);
     for (int i = 0; i < 1000; i++) {
       HLog.Entry entry = createTestLogEntry(i);
       sink.appendEntry(entry);
     }
-    
+
     assertTrue(sink.totalBuffered > 0);
     long amountInChunk = sink.totalBuffered;
     // Get a chunk
     RegionEntryBuffer chunk = sink.getChunkToWrite();
     assertEquals(chunk.heapSize(), amountInChunk);
-    
+
     // Make sure it got marked that a thread is "working on this"
     assertTrue(sink.isRegionCurrentlyWriting(TEST_REGION));
 
@@ -136,26 +133,26 @@ public class TestHLogMethods {
     for (int i = 0; i < 500; i++) {
       HLog.Entry entry = createTestLogEntry(i);
       sink.appendEntry(entry);
-    }    
+    }
     // Asking for another chunk shouldn't work since the first one
     // is still writing
     assertNull(sink.getChunkToWrite());
-    
+
     // If we say we're done writing the first chunk, then we should be able
     // to get the second
     sink.doneWriting(chunk);
-    
+
     RegionEntryBuffer chunk2 = sink.getChunkToWrite();
     assertNotNull(chunk2);
     assertNotSame(chunk, chunk2);
     long amountInChunk2 = sink.totalBuffered;
     // The second chunk had fewer rows than the first
     assertTrue(amountInChunk2 < amountInChunk);
-    
+
     sink.doneWriting(chunk2);
     assertEquals(0, sink.totalBuffered);
   }
-  
+
   private HLog.Entry createTestLogEntry(int i) {
     long seq = i;
     long now = i * 1000;

Reply via email to