HBASE-20050 Reimplement updateReplicationPositions logic in serial replication 
based on the newly introduced replication storage layer

Signed-off-by: zhangduo <zhang...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1d11cdb2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1d11cdb2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1d11cdb2

Branch: refs/heads/HBASE-20046-branch-2
Commit: 1d11cdb26cf3c713a4f0306e05baa0c5865501dd
Parents: 39c1ddc
Author: huzheng <open...@gmail.com>
Authored: Wed Feb 28 16:25:24 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Mon Apr 9 15:18:44 2018 +0800

----------------------------------------------------------------------
 .../replication/ReplicationQueueStorage.java    | 15 +++-
 .../replication/ZKReplicationQueueStorage.java  | 88 ++++++++++++++++++--
 .../replication/TestReplicationStateBasic.java  | 48 ++++++++++-
 .../TestZKReplicationQueueStorage.java          |  7 +-
 .../regionserver/ReplicationSourceManager.java  |  4 +-
 5 files changed, 146 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index e774148..4c93da6 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 
@@ -63,9 +64,19 @@ public interface ReplicationQueueStorage {
    * @param queueId a String that identifies the queue
    * @param fileName name of the WAL
    * @param position the current position in the file
+   * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for 
serial replication.
    */
-  void setWALPosition(ServerName serverName, String queueId, String fileName, 
long position)
-      throws ReplicationException;
+  void setWALPosition(ServerName serverName, String queueId, String fileName, 
long position,
+      Map<String, Long> lastSeqIds) throws ReplicationException;
+
+  /**
+   * Read the max sequence id of the specific region for a given peer. For 
serial replication, we
+   * need the max sequenced id to decide whether we can push the next entries.
+   * @param encodedRegionName the encoded region name
+   * @param peerId peer id
+   * @return the max sequence id of the specific region for a given peer.
+   */
+  long getLastSequenceId(String encodedRegionName, String peerId) throws 
ReplicationException;
 
   /**
    * Get the current position for a specific WAL in a given queue for a given 
regionserver.

http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index da96c65..adbf259 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -23,6 +23,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -85,6 +87,10 @@ class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
       "zookeeper.znode.replication.hfile.refs";
   public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = 
"hfile-refs";
 
+  public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY =
+      "zookeeper.znode.replication.regions";
+  public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = 
"regions";
+
   /**
    * The name of the znode that contains all replication queues
    */
@@ -95,6 +101,8 @@ class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
    */
   private final String hfileRefsZNode;
 
+  private final String regionsZNode;
+
   public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) {
     super(zookeeper, conf);
 
@@ -103,6 +111,8 @@ class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
       ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
     this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
     this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, 
hfileRefsZNodeName);
+    this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf
+        .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, 
ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
   }
 
   private String getRsNode(ServerName serverName) {
@@ -121,6 +131,28 @@ class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
     return getFileNode(getQueueNode(serverName, queueId), fileName);
   }
 
+  /**
+   * Put all regions under /hbase/replication/regions znode will lead to too 
many children because
+   * of the huge number of regions in real production environment. So here we 
use hash of encoded
+   * region name to distribute the znode into multiple znodes. <br>
+   * So the final znode path will be format like this:
+   *
+   * <pre>
+   * /hbase/replication/regions/254/dd04e76a6966d4ffa908ed0586764767-100
+   * </pre>
+   *
+   * The 254 indicate the hash of encoded region name, the 100 indicate the 
peer id.
+   * @param encodedRegionName the encoded region name.
+   * @param peerId peer id for replication.
+   * @return ZNode path to persist the max sequence id that we've pushed for 
the given region and
+   *         peer.
+   */
+  private String getSerialReplicationRegionPeerNode(String encodedRegionName, 
String peerId) {
+    int hash = encodedRegionName.hashCode() & 0x0000FFFF;
+    String hashPath = ZNodePaths.joinZNode(regionsZNode, String.valueOf(hash));
+    return ZNodePaths.joinZNode(hashPath, String.format("%s-%s", 
encodedRegionName, peerId));
+  }
+
   @Override
   public void removeQueue(ServerName serverName, String queueId) throws 
ReplicationException {
     try {
@@ -137,8 +169,8 @@ class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
     try {
       ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, 
fileName));
     } catch (KeeperException e) {
-      throw new ReplicationException("Failed to add wal to queue (serverName=" 
+ serverName +
-        ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
+      throw new ReplicationException("Failed to add wal to queue (serverName=" 
+ serverName
+          + ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
     }
   }
 
@@ -157,15 +189,55 @@ class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
   }
 
   @Override
-  public void setWALPosition(ServerName serverName, String queueId, String 
fileName, long position)
-      throws ReplicationException {
+  public void setWALPosition(ServerName serverName, String queueId, String 
fileName, long position,
+      Map<String, Long> lastSeqIds) throws ReplicationException {
     try {
-      ZKUtil.setData(zookeeper, getFileNode(serverName, queueId, fileName),
-        ZKUtil.positionToByteArray(position));
+      List<ZKUtilOp> listOfOps = new ArrayList<>();
+      listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, 
fileName),
+        ZKUtil.positionToByteArray(position)));
+      // Persist the max sequence id(s) of regions for serial replication 
atomically.
+      if (lastSeqIds != null && lastSeqIds.size() > 0) {
+        for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
+          String peerId = new ReplicationQueueInfo(queueId).getPeerId();
+          String path = 
getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
+          /*
+           * Make sure the existence of path
+           * 
/hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the 
javadoc in
+           * multiOrSequential() method said, if received a 
NodeExistsException, all operations will
+           * fail. So create the path here, and in fact, no need to add this 
operation to listOfOps,
+           * because only need to make sure that update file position and 
sequence id atomically.
+           */
+          ZKUtil.createWithParents(zookeeper, path);
+          // Persist the max sequence id of region to zookeeper.
+          listOfOps
+              .add(ZKUtilOp.setData(path, 
ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
+        }
+      }
+      ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
     } catch (KeeperException e) {
-      throw new ReplicationException("Failed to set log position (serverName=" 
+ serverName +
-        ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + 
position + ")", e);
+      throw new ReplicationException("Failed to set log position (serverName=" 
+ serverName
+          + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" 
+ position + ")", e);
+    }
+  }
+
+  @Override
+  public long getLastSequenceId(String encodedRegionName, String peerId)
+      throws ReplicationException {
+    byte[] data;
+    try {
+      data =
+          ZKUtil.getData(zookeeper, 
getSerialReplicationRegionPeerNode(encodedRegionName, peerId));
+    } catch (KeeperException | InterruptedException e) {
+      throw new ReplicationException("Failed to get the last sequence 
id(region="
+          + encodedRegionName + ", peerId=" + peerId + ")");
+    }
+    try {
+      return ZKUtil.parseWALPositionFrom(data);
+    } catch (DeserializationException de) {
+      LOG.warn("Failed to parse log position (region=" + encodedRegionName + 
", peerId=" + peerId
+          + "), data=" + Bytes.toStringBinary(data));
     }
+    return HConstants.NO_SEQNUM;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index fccffb5..5999c1f 100644
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import static org.hamcrest.CoreMatchers.hasItems;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -26,6 +28,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.util.Pair;
@@ -35,6 +38,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
 /**
  * White box testing for replication state interfaces. Implementations should 
extend this class, and
  * initialize the interfaces properly.
@@ -122,7 +127,7 @@ public abstract class TestReplicationStateBasic {
     assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
     assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
     assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
-    rqs.setWALPosition(server3, "qId5", "filename4", 354L);
+    rqs.setWALPosition(server3, "qId5", "filename4", 354L, null);
     assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
 
     assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
@@ -270,6 +275,47 @@ public abstract class TestReplicationStateBasic {
     assertNumberOfPeers(2);
   }
 
+  private String getFileName(String base, int i) {
+    return String.format(base + "-%04d", i);
+  }
+
+  @Test
+  public void testPersistLogPositionAndSeqIdAtomically() throws Exception {
+    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
+    assertTrue(rqs.getAllQueues(serverName1).isEmpty());
+    String queue1 = "1";
+    String region0 = "region0", region1 = "region1";
+    for (int i = 0; i < 10; i++) {
+      rqs.addWAL(serverName1, queue1, getFileName("file1", i));
+    }
+    List<String> queueIds = rqs.getAllQueues(serverName1);
+    assertEquals(1, queueIds.size());
+    assertThat(queueIds, hasItems("1"));
+
+    List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1);
+    assertEquals(10, wals1.size());
+    for (int i = 0; i < 10; i++) {
+      assertThat(wals1, hasItems(getFileName("file1", i)));
+    }
+
+    for (int i = 0; i < 10; i++) {
+      assertEquals(0, rqs.getWALPosition(serverName1, queue1, 
getFileName("file1", i)));
+    }
+    assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1));
+    assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1));
+
+    for (int i = 0; i < 10; i++) {
+      rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) 
* 100,
+        ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L));
+    }
+
+    for (int i = 0; i < 10; i++) {
+      assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, 
getFileName("file1", i)));
+    }
+    assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
+    assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
+  }
+
   protected void assertConnectedPeerStatus(boolean status, String peerId) 
throws Exception {
     // we can first check if the value was changed in the store, if it wasn't 
then fail right away
     if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
index 2c01a26..8ff52f3 100644
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
@@ -127,7 +127,7 @@ public class TestZKReplicationQueueStorage {
     List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1);
     List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
     assertEquals(10, wals1.size());
-    assertEquals(10, wals1.size());
+    assertEquals(10, wals2.size());
     for (int i = 0; i < 10; i++) {
       assertThat(wals1, hasItems(getFileName("file1", i)));
       assertThat(wals2, hasItems(getFileName("file2", i)));
@@ -136,8 +136,9 @@ public class TestZKReplicationQueueStorage {
     for (int i = 0; i < 10; i++) {
       assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, 
getFileName("file1", i)));
       assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, 
getFileName("file2", i)));
-      STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i 
+ 1) * 100);
-      STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i 
+ 1) * 100 + 10);
+      STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i 
+ 1) * 100, null);
+      STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i 
+ 1) * 100 + 10,
+        null);
     }
 
     for (int i = 0; i < 10; i++) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index d11dc8e..eb9dba2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -482,8 +482,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   public void logPositionAndCleanOldLogs(Path log, String queueId, long 
position,
       boolean queueRecovered) {
     String fileName = log.getName();
-    abortWhenFail(
-      () -> this.queueStorage.setWALPosition(server.getServerName(), queueId, 
fileName, position));
+    abortWhenFail(() -> 
this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName,
+      position, null));
     cleanOldLogs(fileName, queueId, queueRecovered);
   }
 

Reply via email to