HBASE-20050 Reimplement updateReplicationPositions logic in serial replication based on the newly introduced replication storage layer
Signed-off-by: zhangduo <zhang...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1d11cdb2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1d11cdb2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1d11cdb2 Branch: refs/heads/HBASE-20046-branch-2 Commit: 1d11cdb26cf3c713a4f0306e05baa0c5865501dd Parents: 39c1ddc Author: huzheng <open...@gmail.com> Authored: Wed Feb 28 16:25:24 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Mon Apr 9 15:18:44 2018 +0800 ---------------------------------------------------------------------- .../replication/ReplicationQueueStorage.java | 15 +++- .../replication/ZKReplicationQueueStorage.java | 88 ++++++++++++++++++-- .../replication/TestReplicationStateBasic.java | 48 ++++++++++- .../TestZKReplicationQueueStorage.java | 7 +- .../regionserver/ReplicationSourceManager.java | 4 +- 5 files changed, 146 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java index e774148..4c93da6 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.SortedSet; @@ -63,9 +64,19 @@ public interface ReplicationQueueStorage { * @param queueId a String that identifies the queue * @param fileName name of the WAL * @param position the current position in the file + * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication. */ - void setWALPosition(ServerName serverName, String queueId, String fileName, long position) - throws ReplicationException; + void setWALPosition(ServerName serverName, String queueId, String fileName, long position, + Map<String, Long> lastSeqIds) throws ReplicationException; + + /** + * Read the max sequence id of the specific region for a given peer. For serial replication, we + * need the max sequenced id to decide whether we can push the next entries. + * @param encodedRegionName the encoded region name + * @param peerId peer id + * @return the max sequence id of the specific region for a given peer. + */ + long getLastSequenceId(String encodedRegionName, String peerId) throws ReplicationException; /** * Get the current position for a specific WAL in a given queue for a given regionserver. http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index da96c65..adbf259 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -85,6 +87,10 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase "zookeeper.znode.replication.hfile.refs"; public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; + public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY = + "zookeeper.znode.replication.regions"; + public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions"; + /** * The name of the znode that contains all replication queues */ @@ -95,6 +101,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase */ private final String hfileRefsZNode; + private final String regionsZNode; + public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) { super(zookeeper, conf); @@ -103,6 +111,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName); this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName); + this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf + .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT)); } private String getRsNode(ServerName serverName) { @@ -121,6 +131,28 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase return getFileNode(getQueueNode(serverName, queueId), fileName); } + /** + * Put all regions under /hbase/replication/regions znode will lead to too many children because + * of the huge number of regions in real production environment. So here we use hash of encoded + * region name to distribute the znode into multiple znodes. <br> + * So the final znode path will be format like this: + * + * <pre> + * /hbase/replication/regions/254/dd04e76a6966d4ffa908ed0586764767-100 + * </pre> + * + * The 254 indicate the hash of encoded region name, the 100 indicate the peer id. + * @param encodedRegionName the encoded region name. + * @param peerId peer id for replication. + * @return ZNode path to persist the max sequence id that we've pushed for the given region and + * peer. + */ + private String getSerialReplicationRegionPeerNode(String encodedRegionName, String peerId) { + int hash = encodedRegionName.hashCode() & 0x0000FFFF; + String hashPath = ZNodePaths.joinZNode(regionsZNode, String.valueOf(hash)); + return ZNodePaths.joinZNode(hashPath, String.format("%s-%s", encodedRegionName, peerId)); + } + @Override public void removeQueue(ServerName serverName, String queueId) throws ReplicationException { try { @@ -137,8 +169,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName)); } catch (KeeperException e) { - throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName + - ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); } } @@ -157,15 +189,55 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } @Override - public void setWALPosition(ServerName serverName, String queueId, String fileName, long position) - throws ReplicationException { + public void setWALPosition(ServerName serverName, String queueId, String fileName, long position, + Map<String, Long> lastSeqIds) throws ReplicationException { try { - ZKUtil.setData(zookeeper, getFileNode(serverName, queueId, fileName), - ZKUtil.positionToByteArray(position)); + List<ZKUtilOp> listOfOps = new ArrayList<>(); + listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName), + ZKUtil.positionToByteArray(position))); + // Persist the max sequence id(s) of regions for serial replication atomically. + if (lastSeqIds != null && lastSeqIds.size() > 0) { + for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) { + String peerId = new ReplicationQueueInfo(queueId).getPeerId(); + String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); + /* + * Make sure the existence of path + * /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in + * multiOrSequential() method said, if received a NodeExistsException, all operations will + * fail. So create the path here, and in fact, no need to add this operation to listOfOps, + * because only need to make sure that update file position and sequence id atomically. + */ + ZKUtil.createWithParents(zookeeper, path); + // Persist the max sequence id of region to zookeeper. + listOfOps + .add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); + } + } + ZKUtil.multiOrSequential(zookeeper, listOfOps, false); } catch (KeeperException e) { - throw new ReplicationException("Failed to set log position (serverName=" + serverName + - ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e); + throw new ReplicationException("Failed to set log position (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e); + } + } + + @Override + public long getLastSequenceId(String encodedRegionName, String peerId) + throws ReplicationException { + byte[] data; + try { + data = + ZKUtil.getData(zookeeper, getSerialReplicationRegionPeerNode(encodedRegionName, peerId)); + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException("Failed to get the last sequence id(region=" + + encodedRegionName + ", peerId=" + peerId + ")"); + } + try { + return ZKUtil.parseWALPositionFrom(data); + } catch (DeserializationException de) { + LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId + + "), data=" + Bytes.toStringBinary(data)); } + return HConstants.NO_SEQNUM; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index fccffb5..5999c1f 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hbase.replication; +import static org.hamcrest.CoreMatchers.hasItems; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -26,6 +28,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.util.Pair; @@ -35,6 +38,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + /** * White box testing for replication state interfaces. Implementations should extend this class, and * initialize the interfaces properly. @@ -122,7 +127,7 @@ public abstract class TestReplicationStateBasic { assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0")); - rqs.setWALPosition(server3, "qId5", "filename4", 354L); + rqs.setWALPosition(server3, "qId5", "filename4", 354L, null); assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4")); assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); @@ -270,6 +275,47 @@ public abstract class TestReplicationStateBasic { assertNumberOfPeers(2); } + private String getFileName(String base, int i) { + return String.format(base + "-%04d", i); + } + + @Test + public void testPersistLogPositionAndSeqIdAtomically() throws Exception { + ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); + assertTrue(rqs.getAllQueues(serverName1).isEmpty()); + String queue1 = "1"; + String region0 = "region0", region1 = "region1"; + for (int i = 0; i < 10; i++) { + rqs.addWAL(serverName1, queue1, getFileName("file1", i)); + } + List<String> queueIds = rqs.getAllQueues(serverName1); + assertEquals(1, queueIds.size()); + assertThat(queueIds, hasItems("1")); + + List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1); + assertEquals(10, wals1.size()); + for (int i = 0; i < 10; i++) { + assertThat(wals1, hasItems(getFileName("file1", i))); + } + + for (int i = 0; i < 10; i++) { + assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i))); + } + assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1)); + assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1)); + + for (int i = 0; i < 10; i++) { + rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, + ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L)); + } + + for (int i = 0; i < 10; i++) { + assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i))); + } + assertEquals(900L, rqs.getLastSequenceId(region0, queue1)); + assertEquals(1000L, rqs.getLastSequenceId(region1, queue1)); + } + protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { // we can first check if the value was changed in the store, if it wasn't then fail right away if (status != rp.getPeerStorage().isPeerEnabled(peerId)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java index 2c01a26..8ff52f3 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java @@ -127,7 +127,7 @@ public class TestZKReplicationQueueStorage { List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1); List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2); assertEquals(10, wals1.size()); - assertEquals(10, wals1.size()); + assertEquals(10, wals2.size()); for (int i = 0; i < 10; i++) { assertThat(wals1, hasItems(getFileName("file1", i))); assertThat(wals2, hasItems(getFileName("file2", i))); @@ -136,8 +136,9 @@ public class TestZKReplicationQueueStorage { for (int i = 0; i < 10; i++) { assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); - STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100); - STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10); + STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, null); + STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10, + null); } for (int i = 0; i < 10; i++) { http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index d11dc8e..eb9dba2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -482,8 +482,8 @@ public class ReplicationSourceManager implements ReplicationListener { public void logPositionAndCleanOldLogs(Path log, String queueId, long position, boolean queueRecovered) { String fileName = log.getName(); - abortWhenFail( - () -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName, position)); + abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName, + position, null)); cleanOldLogs(fileName, queueId, queueRecovered); }