HBASE-15883 Adding WAL files and tracking offsets in HBase.

Implemented ReplicationQueuesHBaseImpl that tracks WAL offsets and replication 
queues in an HBase table.
Only wrote the basic tracking methods, have not implemented claimQueue() or 
HFileRef methods yet.
Wrote a basic unit test for ReplicationQueueHBaseImpl that tests the 
implemented functions on a single Region Server

Signed-off-by: Elliott Clark <elli...@fb.com>
Signed-off-by: Elliott Clark <ecl...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/21e98271
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/21e98271
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/21e98271

Branch: refs/heads/hbase-12439
Commit: 21e98271c32f0d44106515a72b2c92d518c03668
Parents: 9a53d8b
Author: Joseph Hwang <j...@fb.com>
Authored: Thu May 19 17:14:33 2016 -0700
Committer: Elliott Clark <ecl...@apache.org>
Committed: Fri Jun 3 15:23:10 2016 -0700

----------------------------------------------------------------------
 .../hbase/replication/ReplicationFactory.java   |  11 +-
 .../hbase/replication/ReplicationQueues.java    |   8 +-
 .../replication/ReplicationQueuesArguments.java |  66 +++
 .../replication/ReplicationQueuesHBaseImpl.java | 491 +++++++++++++++++++
 .../replication/ReplicationQueuesZKImpl.java    |  13 +-
 .../replication/regionserver/Replication.java   |  12 +-
 .../regionserver/ReplicationSourceManager.java  |   5 +-
 .../replication/TestReplicationAdmin.java       |   3 +-
 .../hbase/master/cleaner/TestLogsCleaner.java   |   3 +-
 .../cleaner/TestReplicationHFileCleaner.java    |   4 +-
 .../replication/TestReplicationStateBasic.java  |   2 +-
 .../TestReplicationStateHBaseImpl.java          | 243 +++++++++
 .../replication/TestReplicationStateZKImpl.java |  13 +-
 .../TestReplicationSourceManager.java           |  36 +-
 .../hadoop/hbase/util/TestHBaseFsckOneRS.java   |   4 +-
 15 files changed, 871 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index 91e77ca..e264a4d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import org.apache.commons.lang.reflect.ConstructorUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
@@ -30,9 +31,11 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 @InterfaceAudience.Private
 public class ReplicationFactory {
 
-  public static ReplicationQueues getReplicationQueues(final ZooKeeperWatcher 
zk,
-      Configuration conf, Abortable abortable) {
-    return new ReplicationQueuesZKImpl(zk, conf, abortable);
+  public static ReplicationQueues 
getReplicationQueues(ReplicationQueuesArguments args)
+      throws Exception {
+    Class<?> classToBuild = args.getConf().getClass("hbase.region.replica." +
+        "replication.ReplicationQueuesType", ReplicationQueuesZKImpl.class);
+    return (ReplicationQueues) 
ConstructorUtils.invokeConstructor(classToBuild, args);
   }
 
   public static ReplicationQueuesClient getReplicationQueuesClient(final 
ZooKeeperWatcher zk,
@@ -44,7 +47,7 @@ public class ReplicationFactory {
       Abortable abortable) {
     return getReplicationPeers(zk, conf, null, abortable);
   }
-  
+
   public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher 
zk, Configuration conf,
       final ReplicationQueuesClient queuesClient, Abortable abortable) {
     return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable);

http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index 0d47a88..db6da91 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -83,13 +83,13 @@ public interface ReplicationQueues {
   /**
    * Get a list of all WALs in the given queue.
    * @param queueId a String that identifies the queue
-   * @return a list of WALs, null if this region server is dead and has no 
outstanding queues
+   * @return a list of WALs, null if no such queue exists for this server
    */
   List<String> getLogsInQueue(String queueId);
 
   /**
    * Get a list of all queues for this region server.
-   * @return a list of queueIds, null if this region server is dead and has no 
outstanding queues
+   * @return a list of queueIds, an empty list if this region server is dead 
and has no outstanding queues
    */
   List<String> getAllQueues();
 
@@ -110,10 +110,10 @@ public interface ReplicationQueues {
 
   /**
    * Checks if the provided znode is the same as this region server's
-   * @param znode to check
+   * @param regionserver the id of the region server
    * @return if this is this rs's znode
    */
-  boolean isThisOurZnode(String znode);
+  boolean isThisOurRegionServer(String regionserver);
 
   /**
    * Add a peer to hfile reference queue if peer does not exist.

http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
new file mode 100644
index 0000000..4907b73
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+@InterfaceAudience.Private
+public class ReplicationQueuesArguments {
+
+  private ZooKeeperWatcher zk;
+  private Configuration conf;
+  private Abortable abort;
+
+  public ReplicationQueuesArguments(Configuration conf, Abortable abort) {
+    this.conf = conf;
+    this.abort = abort;
+  }
+
+  public ReplicationQueuesArguments(Configuration conf, Abortable abort, 
ZooKeeperWatcher zk) {
+    this(conf, abort);
+    setZk(zk);
+  }
+
+  public ZooKeeperWatcher getZk() {
+    return zk;
+  }
+
+  public void setZk(ZooKeeperWatcher zk) {
+    this.zk = zk;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public Abortable getAbort() {
+    return abort;
+  }
+
+  public void setAbort(Abortable abort) {
+    this.abort = abort;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
new file mode 100644
index 0000000..bbc9e32
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
@@ -0,0 +1,491 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+
+@InterfaceAudience.Private
+public class ReplicationQueuesHBaseImpl implements ReplicationQueues {
+
+  /** Name of the HBase Table used for tracking replication*/
+  public static final TableName REPLICATION_TABLE_NAME =
+    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, 
"replication");
+
+  // Column family and column names for the Replication Table
+  private static final byte[] CF = Bytes.toBytes("r");
+  private static final byte[] COL_OWNER = Bytes.toBytes("o");
+  private static final byte[] COL_QUEUE_ID = Bytes.toBytes("q");
+
+  // Column Descriptor for the Replication Table
+  private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR =
+    new HColumnDescriptor(CF).setMaxVersions(1)
+      .setInMemory(true)
+      .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+        // TODO: Figure out which bloom filter to use
+      .setBloomFilterType(BloomType.NONE)
+      .setCacheDataInL1(true);
+
+  // Common byte values used in replication offset tracking
+  private static final byte[] INITIAL_OFFSET = Bytes.toBytes(0L);
+
+  /*
+   * Make sure that HBase table operations for replication have a high number 
of retries. This is
+   * because the server is aborted if any HBase table operation fails. Each 
RPC will be attempted
+   * 3600 times before exiting. This provides each operation with 2 hours of 
retries
+   * before the server is aborted.
+   */
+  private static final int CLIENT_RETRIES = 3600;
+  private static final int RPC_TIMEOUT = 2000;
+  private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
+
+  private final Configuration conf;
+  private final Admin admin;
+  private final Connection connection;
+  private final Table replicationTable;
+  private final Abortable abortable;
+  private String serverName = null;
+  private byte[] serverNameBytes = null;
+
+  public ReplicationQueuesHBaseImpl(ReplicationQueuesArguments args) throws 
IOException {
+    this(args.getConf(), args.getAbort());
+  }
+
+  public ReplicationQueuesHBaseImpl(Configuration conf, Abortable abort) 
throws IOException {
+    this.conf = new Configuration(conf);
+    // Modify the connection's config so that the Replication Table it returns 
has a much higher
+    // number of client retries
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES);
+    this.connection = ConnectionFactory.createConnection(conf);
+    this.admin = connection.getAdmin();
+    this.abortable = abort;
+    replicationTable = createAndGetReplicationTable();
+    replicationTable.setRpcTimeout(RPC_TIMEOUT);
+    replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
+  }
+
+  @Override
+  public void init(String serverName) throws ReplicationException {
+    this.serverName = serverName;
+    this.serverNameBytes = Bytes.toBytes(serverName);
+  }
+
+  @Override
+  public void removeQueue(String queueId) {
+    try {
+      byte[] rowKey = this.queueIdToRowKey(queueId);
+      // The rowkey will be null if the queue cannot be found in the 
Replication Table
+      if (rowKey == null) {
+        String errMsg = "Could not remove non-existent queue with queueId=" + 
queueId;
+        abortable.abort(errMsg, new ReplicationException(errMsg));
+        return;
+      }
+      Delete deleteQueue = new Delete(rowKey);
+      safeQueueUpdate(deleteQueue);
+    } catch (IOException e) {
+      abortable.abort("Could not remove queue with queueId=" + queueId, e);
+    }
+  }
+
+  @Override
+  public void addLog(String queueId, String filename) throws 
ReplicationException {
+    try {
+      // Check if the queue info (Owner, QueueId) is currently stored in the 
Replication Table
+      if (this.queueIdToRowKey(queueId) == null) {
+        // Each queue will have an Owner, QueueId, and a collection of 
[WAL:offset] key values.
+        Put putNewQueue = new 
Put(Bytes.toBytes(buildServerQueueName(queueId)));
+        putNewQueue.addColumn(CF, COL_OWNER, Bytes.toBytes(serverName));
+        putNewQueue.addColumn(CF, COL_QUEUE_ID, Bytes.toBytes(queueId));
+        putNewQueue.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET);
+        replicationTable.put(putNewQueue);
+      } else {
+        // Otherwise simply add the new log and offset as a new column
+        Put putNewLog = new Put(this.queueIdToRowKey(queueId));
+        putNewLog.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET);
+        safeQueueUpdate(putNewLog);
+      }
+    } catch (IOException e) {
+      abortable.abort("Could not add queue queueId=" + queueId + " filename=" 
+ filename, e);
+    }
+  }
+
+  @Override
+  public void removeLog(String queueId, String filename) {
+    try {
+      byte[] rowKey = this.queueIdToRowKey(queueId);
+      if (rowKey == null) {
+        String errMsg = "Could not remove log from non-existent queueId=" + 
queueId + ", filename="
+          + filename;
+        abortable.abort(errMsg, new ReplicationException(errMsg));
+        return;
+      }
+      Delete delete = new Delete(rowKey);
+      delete.addColumns(CF, Bytes.toBytes(filename));
+      safeQueueUpdate(delete);
+    } catch (IOException e) {
+      abortable.abort("Could not remove log from queueId=" + queueId + ", 
filename=" + filename, e);
+    }
+  }
+
+  @Override
+  public void setLogPosition(String queueId, String filename, long position) {
+    try {
+      byte[] rowKey = this.queueIdToRowKey(queueId);
+      if (rowKey == null) {
+        String errMsg = "Could not set position of log from non-existent 
queueId=" + queueId +
+          ", filename=" + filename;
+        abortable.abort(errMsg, new ReplicationException(errMsg));
+        return;
+      }
+      // Check that the log exists. addLog() must have been called before 
setLogPosition().
+      Get checkLogExists = new Get(rowKey);
+      checkLogExists.addColumn(CF, Bytes.toBytes(filename));
+      if (!replicationTable.exists(checkLogExists)) {
+        String errMsg = "Could not set position of non-existent log from 
queueId=" + queueId +
+          ", filename=" + filename;
+        abortable.abort(errMsg, new ReplicationException(errMsg));
+        return;
+      }
+      // Update the log offset if it exists
+      Put walAndOffset = new Put(rowKey);
+      walAndOffset.addColumn(CF, Bytes.toBytes(filename), 
Bytes.toBytes(position));
+      safeQueueUpdate(walAndOffset);
+    } catch (IOException e) {
+      abortable.abort("Failed to write replication wal position (filename=" + 
filename +
+          ", position=" + position + ")", e);
+    }
+  }
+
+  @Override
+  public long getLogPosition(String queueId, String filename) throws 
ReplicationException {
+    try {
+      byte[] rowKey = this.queueIdToRowKey(queueId);
+      if (rowKey == null) {
+        throw new ReplicationException("Could not get position in log for 
non-existent queue " +
+            "queueId=" + queueId + ", filename=" + filename);
+      }
+      Get getOffset = new Get(rowKey);
+      getOffset.addColumn(CF, Bytes.toBytes(filename));
+      Result result = replicationTable.get(getOffset);
+      if (result.isEmpty()) {
+        throw new ReplicationException("Could not read empty result while 
getting log position " +
+            "queueId=" + queueId + ", filename=" + filename);
+      }
+      return Bytes.toLong(result.getValue(CF, Bytes.toBytes(filename)));
+    } catch (IOException e) {
+      throw new ReplicationException("Could not get position in log for 
queueId=" + queueId +
+          ", filename=" + filename);
+    }
+  }
+
+  @Override
+  public void removeAllQueues() {
+    List<String> myQueueIds = getAllQueues();
+    for (String queueId : myQueueIds) {
+      removeQueue(queueId);
+    }
+  }
+
+  @Override
+  public List<String> getLogsInQueue(String queueId) {
+    List<String> logs = new ArrayList<String>();
+    try {
+      byte[] rowKey = this.queueIdToRowKey(queueId);
+      if (rowKey == null) {
+        String errMsg = "Could not get logs from non-existent queueId=" + 
queueId;
+        abortable.abort(errMsg, new ReplicationException(errMsg));
+        return null;
+      }
+      Get getQueue = new Get(rowKey);
+      Result queue = replicationTable.get(getQueue);
+      if (queue.isEmpty()) {
+        return null;
+      }
+      Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF);
+      for (byte[] cQualifier : familyMap.keySet()) {
+        if (Arrays.equals(cQualifier, COL_OWNER) || Arrays.equals(cQualifier, 
COL_QUEUE_ID)) {
+          continue;
+        }
+        logs.add(Bytes.toString(cQualifier));
+      }
+    } catch (IOException e) {
+      abortable.abort("Could not get logs from queue queueId=" + queueId, e);
+      return null;
+    }
+    return logs;
+  }
+
+  @Override
+  public List<String> getAllQueues() {
+    try {
+      return this.getQueuesBelongingToServer(serverName);
+    } catch (IOException e) {
+      abortable.abort("Could not get all replication queues", e);
+      return null;
+    }
+  }
+
+  @Override
+  public SortedMap<String, SortedSet<String>> claimQueues(String regionserver) 
{
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public List<String> getListOfReplicators() {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public boolean isThisOurRegionServer(String regionserver) {
+    return this.serverName.equals(regionserver);
+  }
+
+  @Override
+  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void addHFileRefs(String peerId, List<String> files) throws 
ReplicationException {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void removeHFileRefs(String peerId, List<String> files) {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  /**
+   * Gets the Replication Table. Builds and blocks until the table is 
available if the Replication
+   * Table does not exist.
+   *
+   * @return the Replication Table
+   * @throws IOException if the Replication Table takes too long to build
+   */
+  private Table createAndGetReplicationTable() throws IOException {
+    if (!replicationTableExists()) {
+      createReplicationTable();
+    }
+    int maxRetries = 
conf.getInt("replication.queues.createtable.retries.number", 100);
+    RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, 
100);
+    RetryCounter retryCounter = counterFactory.create();
+    while (!replicationTableExists()) {
+      try {
+        retryCounter.sleepUntilNextRetry();
+        if (!retryCounter.shouldRetry()) {
+          throw new IOException("Unable to acquire the Replication Table");
+        }
+      } catch (InterruptedException e) {
+        return null;
+      }
+    }
+    return connection.getTable(REPLICATION_TABLE_NAME);
+  }
+
+  /**
+   * Checks whether the Replication Table exists yet
+   *
+   * @return whether the Replication Table exists
+   * @throws IOException
+   */
+  private boolean replicationTableExists() {
+    try {
+      return admin.tableExists(REPLICATION_TABLE_NAME);
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  /**
+   * Create the replication table with the provided HColumnDescriptor 
REPLICATION_COL_DESCRIPTOR
+   * in ReplicationQueuesHBaseImpl
+   * @throws IOException
+   */
+  private void createReplicationTable() throws IOException {
+    HTableDescriptor replicationTableDescriptor = new 
HTableDescriptor(REPLICATION_TABLE_NAME);
+    replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
+    admin.createTable(replicationTableDescriptor);
+  }
+
+  /**
+   * Builds the unique identifier for a queue in the Replication table by 
appending the queueId to
+   * the servername
+   *
+   * @param queueId a String that identifies the queue
+   * @return unique identifier for a queue in the Replication table
+   */
+  private String buildServerQueueName(String queueId) {
+    return serverName + "-" + queueId;
+  }
+  
+  /**
+   * See safeQueueUpdate(RowMutations mutate)
+   * @param put Row mutation to perform on the queue
+   */
+  private void safeQueueUpdate(Put put) {
+    RowMutations mutations = new RowMutations(put.getRow());
+    try {
+      mutations.add(put);
+    } catch (IOException e){
+      abortable.abort("Failed to update Replication Table because of 
IOException", e);
+    }
+    safeQueueUpdate(mutations);
+  }
+
+  /**
+   * See safeQueueUpdate(RowMutations mutate)
+   * @param delete Row mutation to perform on the queue
+   */
+  private void safeQueueUpdate(Delete delete) {
+    RowMutations mutations = new RowMutations(delete.getRow());
+    try {
+      mutations.add(delete);
+    } catch (IOException e) {
+      abortable.abort("Failed to update Replication Table because of 
IOException", e);
+    }
+    safeQueueUpdate(mutations);
+  }
+
+  /**
+   * Attempt to mutate a given queue in the Replication Table with a 
checkAndPut on the OWNER column
+   * of the queue. Abort the server if this checkAndPut fails: which means we 
have somehow lost
+   * ownership of the column or an IO Exception has occurred during the 
transaction.
+   *
+   * @param mutate Mutation to perform on a given queue
+   */
+  private void safeQueueUpdate(RowMutations mutate) {
+    try {
+      boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), 
CF, COL_OWNER,
+        CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate);
+      if (!updateSuccess) {
+        String errMsg = "Failed to update Replication Table because we lost 
queue ownership";
+        abortable.abort(errMsg, new ReplicationException(errMsg));
+      }
+    } catch (IOException e) {
+      abortable.abort("Failed to update Replication Table because of 
IOException", e);
+    }
+  }
+
+  /**
+   * Get the QueueIds belonging to the named server from the ReplicationTable
+   *
+   * @param server name of the server
+   * @return a list of the QueueIds belonging to the server
+   * @throws IOException
+   */
+  private List<String> getQueuesBelongingToServer(String server) throws 
IOException {
+    List<String> queues = new ArrayList<String>();
+    Scan scan = new Scan();
+    SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF, 
COL_OWNER,
+      CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
+    scan.setFilter(filterMyQueues);
+    scan.addColumn(CF, COL_QUEUE_ID);
+    scan.addColumn(CF, COL_OWNER);
+    ResultScanner results = replicationTable.getScanner(scan);
+    for (Result result : results) {
+      queues.add(Bytes.toString(result.getValue(CF, COL_QUEUE_ID)));
+    }
+    results.close();
+    return queues;
+  }
+
+  /**
+   * Finds the row key of the HBase row corresponding to the provided queue. 
This has to be done,
+   * because the row key is [original server name + "-" + queueId0]. And the 
original server will
+   * make calls to getLog(), getQueue(), etc. with the argument queueId = 
queueId0.
+   * On the original server we can build the row key by concatenating 
servername + queueId0.
+   * Yet if the queue is claimed by another server, future calls to getLog(), 
getQueue(), etc.
+   * will be made with the argument queueId = queueId0 + "-" + pastOwner0 + 
"-" + pastOwner1 ...
+   * so we need a way to look up rows by their modified queueId's.
+   *
+   * TODO: Consider updating the queueId passed to getLog, getQueue()... 
inside of ReplicationSource
+   * TODO: and ReplicationSourceManager or the parsing of the passed in 
queueId's so that we don't
+   * TODO have to scan the table for row keys for each update. See HBASE-15956.
+   *
+   * TODO: We can also cache queueId's if ReplicationQueuesHBaseImpl becomes a 
bottleneck. We
+   * TODO: currently perform scan's over all the rows looking for one with a 
matching QueueId.
+   *
+   * @param queueId string representation of the queue id
+   * @return the rowkey of the corresponding queue. This returns null if the 
corresponding queue
+   * cannot be found.
+   * @throws IOException
+   */
+  private byte[] queueIdToRowKey(String queueId) throws IOException {
+    Scan scan = new Scan();
+    scan.addColumn(CF, COL_QUEUE_ID);
+    scan.addColumn(CF, COL_OWNER);
+    scan.setMaxResultSize(1);
+    // Search for the queue that matches this queueId
+    SingleColumnValueFilter filterByQueueId = new SingleColumnValueFilter(CF, 
COL_QUEUE_ID,
+        CompareFilter.CompareOp.EQUAL, Bytes.toBytes(queueId));
+    // Make sure that we are the owners of the queue. QueueId's may overlap.
+    SingleColumnValueFilter filterByOwner = new SingleColumnValueFilter(CF, 
COL_OWNER,
+        CompareFilter.CompareOp.EQUAL, Bytes.toBytes(serverName));
+    // We only want the row key
+    FirstKeyOnlyFilter filterOutColumns = new FirstKeyOnlyFilter();
+    FilterList filterList = new FilterList(filterByQueueId, filterByOwner, 
filterOutColumns);
+    scan.setFilter(filterList);
+    ResultScanner results = replicationTable.getScanner(scan);
+    Result result = results.next();
+    results.close();
+    return (result == null) ? null : result.getRow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 2bb8ea8..32d0883 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -41,7 +41,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
 /**
- * This class provides an implementation of the ReplicationQueues interface 
using ZooKeeper. The
+ * This class provides an implementation of the
+ * interface using ZooKeeper. The
  * base znode that this class works at is the myQueuesZnode. The myQueuesZnode 
contains a list of
  * all outstanding WAL files on this region server that need to be replicated. 
The myQueuesZnode is
  * the regionserver name (a concatenation of the region server’s hostname, 
client port and start
@@ -71,6 +72,10 @@ public class ReplicationQueuesZKImpl extends 
ReplicationStateZKBase implements R
 
   private static final Log LOG = 
LogFactory.getLog(ReplicationQueuesZKImpl.class);
 
+  public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) {
+    this(args.getZk(), args.getConf(), args.getAbort());
+  }
+
   public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
       Abortable abortable) {
     super(zk, conf, abortable);
@@ -166,8 +171,8 @@ public class ReplicationQueuesZKImpl extends 
ReplicationStateZKBase implements R
   }
 
   @Override
-  public boolean isThisOurZnode(String znode) {
-    return ZKUtil.joinZNode(this.queuesZNode, 
znode).equals(this.myQueuesZnode);
+  public boolean isThisOurRegionServer(String regionserver) {
+    return ZKUtil.joinZNode(this.queuesZNode, 
regionserver).equals(this.myQueuesZnode);
   }
 
   @Override
@@ -223,7 +228,7 @@ public class ReplicationQueuesZKImpl extends 
ReplicationStateZKBase implements R
       this.abortable.abort("Failed to get a list of queues for region server: "
           + this.myQueuesZnode, e);
     }
-    return listOfQueues;
+    return listOfQueues == null ? new ArrayList<String>() : listOfQueues;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index fa5e222..d55472d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -48,16 +48,17 @@ import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.zookeeper.KeeperException;
 
@@ -127,7 +128,8 @@ public class Replication extends WALActionsListener.Base 
implements
     if (replication) {
       try {
         this.replicationQueues =
-            ReplicationFactory.getReplicationQueues(server.getZooKeeper(), 
this.conf, this.server);
+            ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, this.server,
+              server.getZooKeeper()));
         this.replicationQueues.init(this.server.getServerName().toString());
         this.replicationPeers =
             ReplicationFactory.getReplicationPeers(server.getZooKeeper(), 
this.conf, this.server);
@@ -135,7 +137,7 @@ public class Replication extends WALActionsListener.Base 
implements
         this.replicationTracker =
             ReplicationFactory.getReplicationTracker(server.getZooKeeper(), 
this.replicationPeers,
               this.conf, this.server, this.server);
-      } catch (ReplicationException e) {
+      } catch (Exception e) {
         throw new IOException("Failed replication handler create", e);
       }
       UUID clusterId = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b585513..ed2eecc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -315,9 +315,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    */
   public void join() {
     this.executor.shutdown();
-    if (this.sources.size() == 0) {
-      this.replicationQueues.removeAllQueues();
-    }
     for (ReplicationSourceInterface source : this.sources) {
       source.terminate("Region server is closing");
     }
@@ -624,7 +621,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
     @Override
     public void run() {
-      if (this.rq.isThisOurZnode(rsZnode)) {
+      if (this.rq.isThisOurRegionServer(rsZnode)) {
         return;
       }
       // Wait a bit before transferring the queues, we may be shutting down.

http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index c3241c9..06a3c7e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -160,7 +161,7 @@ public class TestReplicationAdmin {
     Configuration conf = TEST_UTIL.getConfiguration();
     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test HBaseAdmin", null);
     ReplicationQueues repQueues =
-        ReplicationFactory.getReplicationQueues(zkw, conf, null);
+        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, null, zkw));
     repQueues.init("server1");
 
     // add queue for ID_ONE

http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 47db32b..18950a2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
@@ -94,7 +95,7 @@ public class TestLogsCleaner {
     Replication.decorateMasterConfiguration(conf);
     Server server = new DummyServer();
     ReplicationQueues repQueues =
-        ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, 
server);
+        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
     repQueues.init(server.getServerName().toString());
     final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
         HConstants.HREGION_OLDLOGDIR_NAME);

http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index d4f23c8..1778e73 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
 import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
@@ -87,8 +88,7 @@ public class TestReplicationHFileCleaner {
     Replication.decorateMasterConfiguration(conf);
     rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, 
server);
     rp.init();
-
-    rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, 
server);
+    rq = ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
     rq.init(server.getServerName().toString());
     try {
       fs = FileSystem.get(conf);

http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 144046f4..5ab26ab 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -121,7 +121,7 @@ public abstract class TestReplicationStateBasic {
     rq1.removeQueue("bogus");
     rq1.removeLog("bogus", "bogus");
     rq1.removeAllQueues();
-    assertNull(rq1.getAllQueues());
+    assertEquals(0, rq1.getAllQueues().size());
     assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
     assertNull(rq1.getLogsInQueue("bogus"));
     assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, 
-1L).toString()).size());

http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
new file mode 100644
index 0000000..8186213
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static junit.framework.TestCase.assertNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({ReplicationTests.class, MediumTests.class})
+public class TestReplicationStateHBaseImpl {
+
+  private static Configuration conf;
+  private static HBaseTestingUtility utility;
+  private static Connection connection;
+  private static ReplicationQueues rqH;
+
+  private final String server1 = ServerName.valueOf("hostname1.example.org", 
1234, -1L).toString();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    utility = new HBaseTestingUtility();
+    utility.startMiniCluster();
+    conf = utility.getConfiguration();
+    conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
+        ReplicationQueuesHBaseImpl.class, ReplicationQueues.class);
+    connection = ConnectionFactory.createConnection(conf);
+  }
+
+  @Test
+  public void checkNamingSchema() throws Exception {
+    rqH.init(server1);
+    assertTrue(rqH.isThisOurRegionServer(server1));
+    assertTrue(!rqH.isThisOurRegionServer(server1 + "a"));
+    assertTrue(!rqH.isThisOurRegionServer(null));
+  }
+
+  @Test
+  public void testReplicationStateHBase() {
+    DummyServer ds = new DummyServer(server1);
+    try {
+      rqH = ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, ds, null));
+      rqH.init(server1);
+      // Check that the proper System Tables have been generated
+      Table replicationTable = connection.getTable(
+          ReplicationQueuesHBaseImpl.REPLICATION_TABLE_NAME);
+      assertTrue(replicationTable.getName().isSystemTable());
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail("testReplicationStateHBaseConstruction received an Exception");
+    }
+    try {
+      // Test adding in WAL files
+      assertEquals(0, rqH.getAllQueues().size());
+      rqH.addLog("Queue1", "WALLogFile1.1");
+      assertEquals(1, rqH.getAllQueues().size());
+      rqH.addLog("Queue1", "WALLogFile1.2");
+      rqH.addLog("Queue1", "WALLogFile1.3");
+      rqH.addLog("Queue1", "WALLogFile1.4");
+      rqH.addLog("Queue2", "WALLogFile2.1");
+      rqH.addLog("Queue3", "WALLogFile3.1");
+      assertEquals(3, rqH.getAllQueues().size());
+      assertEquals(4, rqH.getLogsInQueue("Queue1").size());
+      assertEquals(1, rqH.getLogsInQueue("Queue2").size());
+      assertEquals(1, rqH.getLogsInQueue("Queue3").size());
+      // Make sure that abortCount is still 0
+      assertEquals(0, ds.getAbortCount());
+      // Make sure that getting a log from a non-existent queue triggers an 
abort
+      assertNull(rqH.getLogsInQueue("Queue4"));
+      assertEquals(1, ds.getAbortCount());
+    } catch (ReplicationException e) {
+      e.printStackTrace();
+      fail("testAddLog received a ReplicationException");
+    }
+    try {
+
+      // Test updating the log positions
+      assertEquals(0L, rqH.getLogPosition("Queue1", "WALLogFile1.1"));
+      rqH.setLogPosition("Queue1", "WALLogFile1.1", 123L);
+      assertEquals(123L, rqH.getLogPosition("Queue1", "WALLogFile1.1"));
+      rqH.setLogPosition("Queue1", "WALLogFile1.1", 123456789L);
+      assertEquals(123456789L, rqH.getLogPosition("Queue1", "WALLogFile1.1"));
+      rqH.setLogPosition("Queue2", "WALLogFile2.1", 242L);
+      assertEquals(242L, rqH.getLogPosition("Queue2", "WALLogFile2.1"));
+      rqH.setLogPosition("Queue3", "WALLogFile3.1", 243L);
+      assertEquals(243L, rqH.getLogPosition("Queue3", "WALLogFile3.1"));
+
+      // Test that setting log positions in non-existing logs will cause an 
abort
+      assertEquals(1, ds.getAbortCount());
+      rqH.setLogPosition("NotHereQueue", "WALLogFile3.1", 243L);
+      assertEquals(2, ds.getAbortCount());
+      rqH.setLogPosition("NotHereQueue", "NotHereFile", 243L);
+      assertEquals(3, ds.getAbortCount());
+      rqH.setLogPosition("Queue1", "NotHereFile", 243l);
+      assertEquals(4, ds.getAbortCount());
+
+      // Test reading log positions for non-existent queues and WAL's
+      try {
+        rqH.getLogPosition("Queue1", "NotHereWAL");
+        fail("Replication queue should have thrown a ReplicationException for 
reading from a " +
+            "non-existent WAL");
+      } catch (ReplicationException e) {
+      }
+      try {
+        rqH.getLogPosition("NotHereQueue", "NotHereWAL");
+        fail("Replication queue should have thrown a ReplicationException for 
reading from a " +
+            "non-existent queue");
+      } catch (ReplicationException e) {
+      }
+      // Test removing logs
+      rqH.removeLog("Queue1", "WALLogFile1.1");
+      assertEquals(3, rqH.getLogsInQueue("Queue1").size());
+      // Test removing queues
+      rqH.removeQueue("Queue2");
+      assertEquals(2, rqH.getAllQueues().size());
+      assertNull(rqH.getLogsInQueue("Queue2"));
+      // Test that getting logs from a non-existent queue aborts
+      assertEquals(5, ds.getAbortCount());
+      // Test removing all queues for a Region Server
+      rqH.removeAllQueues();
+      assertEquals(0, rqH.getAllQueues().size());
+      assertNull(rqH.getLogsInQueue("Queue1"));
+      // Test that getting logs from a non-existent queue aborts
+      assertEquals(6, ds.getAbortCount());
+    } catch (ReplicationException e) {
+      e.printStackTrace();
+      fail("testAddLog received a ReplicationException");
+    }
+  }
+
+  static class DummyServer implements Server {
+    private String serverName;
+    private boolean isAborted = false;
+    private boolean isStopped = false;
+    private int abortCount = 0;
+
+    public DummyServer(String serverName) {
+      this.serverName = serverName;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return conf;
+    }
+
+    @Override
+    public ZooKeeperWatcher getZooKeeper() {
+      return null;
+    }
+
+    @Override
+    public CoordinatedStateManager getCoordinatedStateManager() {
+      return null;
+    }
+
+    @Override
+    public ClusterConnection getConnection() {
+      return null;
+    }
+
+    @Override
+    public MetaTableLocator getMetaTableLocator() {
+      return null;
+    }
+
+    @Override
+    public ServerName getServerName() {
+      return ServerName.valueOf(this.serverName);
+    }
+
+    @Override
+    public void abort(String why, Throwable e) {
+      abortCount++;
+      this.isAborted = true;
+    }
+
+    @Override
+    public boolean isAborted() {
+      return this.isAborted;
+    }
+
+    @Override
+    public void stop(String why) {
+      this.isStopped = true;
+    }
+
+    @Override
+    public boolean isStopped() {
+      return this.isStopped;
+    }
+
+    @Override
+    public ChoreService getChoreService() {
+      return null;
+    }
+
+    @Override
+    public ClusterConnection getClusterConnection() {
+      return null;
+    }
+
+    public int getAbortCount() {
+      return abortCount;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 94dbb25..e731135 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 
@@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@@ -91,9 +93,14 @@ public class TestReplicationStateZKImpl extends 
TestReplicationStateBasic {
     DummyServer ds1 = new DummyServer(server1);
     DummyServer ds2 = new DummyServer(server2);
     DummyServer ds3 = new DummyServer(server3);
-    rq1 = ReplicationFactory.getReplicationQueues(zkw, conf, ds1);
-    rq2 = ReplicationFactory.getReplicationQueues(zkw, conf, ds2);
-    rq3 = ReplicationFactory.getReplicationQueues(zkw, conf, ds3);
+    try {
+      rq1 = ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, ds1, zkw));
+      rq2 = ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, ds2, zkw));
+      rq3 = ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, ds3, zkw));
+    } catch (Exception e) {
+      // This should not occur, because getReplicationQueues() only throws for 
ReplicationQueuesHBaseImpl
+      fail("ReplicationFactory.getReplicationQueues() threw an IO Exception");
+    }
     rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1);
     rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
     OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);

http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 9e950d2..d1db068 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
@@ -284,9 +285,11 @@ public class TestReplicationSourceManager {
     LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
     final Server server = new DummyServer("hostname0.example.org");
+
+
     ReplicationQueues rq =
-        ReplicationFactory.getReplicationQueues(server.getZooKeeper(), 
server.getConfiguration(),
-          server);
+        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(server.getConfiguration(), server,
+          server.getZooKeeper()));
     rq.init(server.getServerName().toString());
     // populate some znodes in the peer znode
     files.add("log1");
@@ -326,8 +329,8 @@ public class TestReplicationSourceManager {
   public void testCleanupFailoverQueues() throws Exception {
     final Server server = new DummyServer("hostname1.example.org");
     ReplicationQueues rq =
-        ReplicationFactory.getReplicationQueues(server.getZooKeeper(), 
server.getConfiguration(),
-          server);
+        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(server.getConfiguration(), server,
+          server.getZooKeeper()));
     rq.init(server.getServerName().toString());
     // populate some znodes in the peer znode
     SortedSet<String> files = new TreeSet<String>();
@@ -341,7 +344,8 @@ public class TestReplicationSourceManager {
     }
     Server s1 = new DummyServer("dummyserver1.example.org");
     ReplicationQueues rq1 =
-        ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), 
s1.getConfiguration(), s1);
+        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s1.getConfiguration(), s1,
+          s1.getZooKeeper()));
     rq1.init(s1.getServerName().toString());
     ReplicationPeers rp1 =
         ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), 
s1.getConfiguration(), s1);
@@ -365,7 +369,8 @@ public class TestReplicationSourceManager {
     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
     final Server server = new 
DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
     ReplicationQueues repQueues =
-        ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, 
server);
+        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, server,
+          server.getZooKeeper()));
     repQueues.init(server.getServerName().toString());
     // populate some znodes in the peer znode
     files.add("log1");
@@ -381,16 +386,19 @@ public class TestReplicationSourceManager {
 
     // simulate three servers fail sequentially
     ReplicationQueues rq1 =
-        ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), 
s1.getConfiguration(), s1);
+        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s1.getConfiguration(), s1,
+          s1.getZooKeeper()));
     rq1.init(s1.getServerName().toString());
     SortedMap<String, SortedSet<String>> testMap =
         rq1.claimQueues(server.getServerName().getServerName());
     ReplicationQueues rq2 =
-        ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), 
s2.getConfiguration(), s2);
+        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s2.getConfiguration(), s2,
+          s2.getZooKeeper()));
     rq2.init(s2.getServerName().toString());
     testMap = rq2.claimQueues(s1.getServerName().getServerName());
     ReplicationQueues rq3 =
-        ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), 
s3.getConfiguration(), s3);
+        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s3.getConfiguration(), s3,
+          s3.getZooKeeper()));
     rq3.init(s3.getServerName().toString());
     testMap = rq3.claimQueues(s2.getServerName().getServerName());
 
@@ -412,7 +420,8 @@ public class TestReplicationSourceManager {
     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
     final Server s0 = new DummyServer("cversion-change0.example.org");
     ReplicationQueues repQueues =
-        ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
+        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, s0,
+          s0.getZooKeeper()));
     repQueues.init(s0.getServerName().toString());
     // populate some znodes in the peer znode
     files.add("log1");
@@ -423,7 +432,8 @@ public class TestReplicationSourceManager {
     // simulate queue transfer
     Server s1 = new DummyServer("cversion-change1.example.org");
     ReplicationQueues rq1 =
-        ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), 
s1.getConfiguration(), s1);
+        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s1.getConfiguration(), s1,
+          s1.getZooKeeper()));
     rq1.init(s1.getServerName().toString());
 
     ReplicationQueuesClient client =
@@ -522,8 +532,8 @@ public class TestReplicationSourceManager {
       this.deadRsZnode = znode;
       this.server = s;
       this.rq =
-          ReplicationFactory.getReplicationQueues(server.getZooKeeper(), 
server.getConfiguration(),
-            server);
+          ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(server.getConfiguration(), server,
+            server.getZooKeeper()));
       this.rq.init(this.server.getServerName().toString());
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/21e98271/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
index 2140b39..84ef6da 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
@@ -58,6 +58,7 @@ import 
org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
@@ -1543,7 +1544,8 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck 
{
     // create replicator
     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test Hbase Fsck", 
connection);
     ReplicationQueues repQueues =
-        ReplicationFactory.getReplicationQueues(zkw, conf, connection);
+        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, connection,
+          zkw));
     repQueues.init("server1");
     // queues for current peer, no errors
     repQueues.addLog("1", "file1");

Reply via email to