Repository: hbase Updated Branches: refs/heads/0.98 dba43b628 -> 99798411e
HBASE-16870 Add the metrics of replication sources which were transformed from other dead rs to ReplicationLoad Signed-off-by: zhangduo <zhang...@apache.org> Amending-Author: Andrew Purtell <apurt...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/99798411 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/99798411 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/99798411 Branch: refs/heads/0.98 Commit: 99798411e537007a3a12795f945c431081bc1c34 Parents: dba43b6 Author: Guanghao Zhang <zghao...@gmail.com> Authored: Thu Oct 20 09:33:03 2016 +0800 Committer: Andrew Purtell <apurt...@apache.org> Committed: Sat Nov 12 09:41:06 2016 -0800 ---------------------------------------------------------------------- .../replication/regionserver/MetricsSource.java | 2 +- .../replication/regionserver/Replication.java | 13 ++- .../regionserver/ReplicationLoad.java | 26 ++++- .../regionserver/ReplicationSourceManager.java | 24 ++++- .../hbase/replication/TestReplicationBase.java | 5 +- .../replication/TestReplicationSmallTests.java | 45 --------- .../replication/TestReplicationStatus.java | 100 +++++++++++++++++++ 7 files changed, 159 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/99798411/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 134477d..c5e5ff3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -95,7 +95,7 @@ public class MetricsSource implements BaseSource { public void setAgeOfLastShippedOp(long timestamp) { long age = EnvironmentEdgeManager.currentTimeMillis() - timestamp; singleSourceSource.setLastShippedAge(age); - globalSourceSource.setLastShippedAge(age); + globalSourceSource.setLastShippedAge(Math.max(age, globalSourceSource.getLastShippedAge())); this.lastTimestamp = timestamp; } http://git-wip-us.apache.org/repos/asf/hbase/blob/99798411/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 04c6f24..bd5c58a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -351,15 +351,24 @@ public class Replication implements WALActionsListener, } private void buildReplicationLoad() { - // get source - List<ReplicationSourceInterface> sources = this.replicationManager.getSources(); List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>(); + // get source + List<ReplicationSourceInterface> sources = this.replicationManager.getSources(); for (ReplicationSourceInterface source : sources) { if (source instanceof ReplicationSource) { sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); } } + + // get old source + List<ReplicationSourceInterface> oldSources = this.replicationManager.getOldSources(); + for (ReplicationSourceInterface source : oldSources) { + if (source instanceof ReplicationSource) { + sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); + } + } + // get sink MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics); http://git-wip-us.apache.org/repos/asf/hbase/blob/99798411/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index a89da82..3e2b077 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.ArrayList; +import java.util.Map; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; @@ -66,8 +68,14 @@ public class ReplicationLoad { this.replicationLoadSink = rLoadSinkBuild.build(); // build the SourceLoad List - this.replicationLoadSourceList = new ArrayList<ClusterStatusProtos.ReplicationLoadSource>(); + Map<String, ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceMap = + new HashMap<String, ClusterStatusProtos.ReplicationLoadSource>(); for (MetricsSource sm : this.sourceMetricsList) { + // Get the actual peer id + String peerId = sm.getPeerID(); + String[] parts = peerId.split("-", 2); + peerId = parts.length != 1 ? parts[0] : peerId; + long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); int sizeOfLogQueue = sm.getSizeOfLogQueue(); long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp(); @@ -85,17 +93,27 @@ public class ReplicationLoad { replicationLag = 0; } + ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId); + if (rLoadSource != null) { + ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(), ageOfLastShippedOp); + sizeOfLogQueue += rLoadSource.getSizeOfLogQueue(); + timeStampOfLastShippedOp = Math.min(rLoadSource.getTimeStampOfLastShippedOp(), + timeStampOfLastShippedOp); + replicationLag = Math.max(rLoadSource.getReplicationLag(), replicationLag); + } + ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild = ClusterStatusProtos.ReplicationLoadSource.newBuilder(); - rLoadSourceBuild.setPeerID(sm.getPeerID()); + rLoadSourceBuild.setPeerID(peerId); rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp); rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue); rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp); rLoadSourceBuild.setReplicationLag(replicationLag); - this.replicationLoadSourceList.add(rLoadSourceBuild.build()); + replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build()); } - + this.replicationLoadSourceList = new ArrayList<ClusterStatusProtos.ReplicationLoadSource>( + replicationLoadSourceMap.values()); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/99798411/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 8ba42ff..b9d7807 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -456,6 +456,9 @@ public class ReplicationSourceManager implements ReplicationListener { */ public void closeRecoveredQueue(ReplicationSourceInterface src) { LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); + if (src instanceof ReplicationSource) { + ((ReplicationSource) src).getSourceMetrics().clear(); + } this.oldsources.remove(src); deleteSource(src.getPeerClusterZnode(), false); this.hlogsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); @@ -491,9 +494,24 @@ public class ReplicationSourceManager implements ReplicationListener { + oldSourcesToDelete.size()); // Now look for the one on this cluster List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>(); - for (ReplicationSourceInterface src : this.sources) { - if (id.equals(src.getPeerClusterId())) { - srcToRemove.add(src); + // synchronize on replicationPeers to avoid adding source for the to-be-removed peer + synchronized (this.replicationPeers) { + for (ReplicationSourceInterface src : this.sources) { + if (id.equals(src.getPeerClusterId())) { + srcToRemove.add(src); + } + } + if (srcToRemove.isEmpty()) { + LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " + + "This could mean that ReplicationSourceInterface initialization failed for this peer " + + "and that replication on this peer may not be caught up. peerId=" + id); + } + for (ReplicationSourceInterface toRemove : srcToRemove) { + toRemove.terminate(terminateMessage); + if (toRemove instanceof ReplicationSource) { + ((ReplicationSource) toRemove).getSourceMetrics().clear(); + } + this.sources.remove(toRemove); } } if (srcToRemove.size() == 0) { http://git-wip-us.apache.org/repos/asf/hbase/blob/99798411/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 71c9814..45d9164 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -67,6 +67,7 @@ public class TestReplicationBase { protected static HBaseTestingUtility utility1; protected static HBaseTestingUtility utility2; + protected static final String PEER_ID = "2"; protected static final int NB_ROWS_IN_BATCH = 100; protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10; @@ -120,7 +121,9 @@ public class TestReplicationBase { utility2.setZkCluster(miniZK); zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true); - admin.addPeer("2", utility2.getClusterKey()); + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + admin.addPeer(PEER_ID, rpc, null); LOG.info("Setup second Zk"); CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); http://git-wip-us.apache.org/repos/asf/hbase/blob/99798411/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index c6f5c41..5df28a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -702,51 +702,6 @@ public class TestReplicationSmallTests extends TestReplicationBase { hadmin.close(); } - /** - * Test for HBASE-9531 - * put a few rows into htable1, which should be replicated to htable2 - * create a ClusterStatus instance 'status' from HBaseAdmin - * test : status.getLoad(server).getReplicationLoadSourceList() - * test : status.getLoad(server).getReplicationLoadSink() - * * @throws Exception - */ - @Test(timeout = 300000) - public void testReplicationStatus() throws Exception { - LOG.info("testReplicationStatus"); - - HBaseAdmin admin = utility1.getHBaseAdmin(); - try { - - final byte[] qualName = Bytes.toBytes("q"); - Put p; - - for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { - p = new Put(Bytes.toBytes("row" + i)); - p.add(famName, qualName, Bytes.toBytes("val" + i)); - htable1.put(p); - } - - ClusterStatus status = admin.getClusterStatus(); - - for (ServerName server : status.getServers()) { - ServerLoad sl = status.getLoad(server); - List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList(); - ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink(); - - // check SourceList has at least one entry - assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() > 0)); - - // check Sink exist only as it is difficult to verify the value on the fly - assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ", - (rLoadSink.getAgeOfLastAppliedOp() >= 0)); - assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ", - (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0)); - } - } finally { - admin.close(); - } - } - @Test(timeout=300000) public void testVerifyReplicationPrefixFiltering() throws Exception { final byte[] prefixRow = Bytes.toBytes("prefixrow"); http://git-wip-us.apache.org/repos/asf/hbase/blob/99798411/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java new file mode 100644 index 0000000..ee46f96 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestReplicationStatus extends TestReplicationBase { + private static final Log LOG = LogFactory.getLog(TestReplicationStatus.class); + + /** + * Test for HBASE-9531 + * put a few rows into htable1, which should be replicated to htable2 + * create a ClusterStatus instance 'status' from HBaseAdmin + * test : status.getLoad(server).getReplicationLoadSourceList() + * test : status.getLoad(server).getReplicationLoadSink() + * * @throws Exception + */ + @Test(timeout = 300000) + public void testReplicationStatus() throws Exception { + LOG.info("testReplicationStatus"); + + HBaseAdmin hbaseAdmin = utility1.getHBaseAdmin(); + try { + // disable peer + admin.disablePeer(PEER_ID); + + final byte[] qualName = Bytes.toBytes("q"); + Put p; + + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + p = new Put(Bytes.toBytes("row" + i)); + p.add(famName, qualName, Bytes.toBytes("val" + i)); + htable1.put(p); + } + + ClusterStatus status = hbaseAdmin.getClusterStatus(); + + for (ServerName server : status.getServers()) { + ServerLoad sl = status.getLoad(server); + List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList(); + ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink(); + + // check SourceList only has one entry, beacuse only has one peer + assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1)); + assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID()); + + // check Sink exist only as it is difficult to verify the value on the fly + assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ", + (rLoadSink.getAgeOfLastAppliedOp() >= 0)); + assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ", + (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0)); + } + + // Stop rs1, then the queue of rs1 will be transfered to rs0 + utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer"); + Thread.sleep(10000); + status = hbaseAdmin.getClusterStatus(); + ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName(); + ServerLoad sl = status.getLoad(server); + List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList(); + // check SourceList still only has one entry + assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1)); + assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID()); + } finally { + admin.enablePeer(PEER_ID); + utility1.getHBaseCluster().startRegionServer(); + } + } +}