Repository: hbase Updated Branches: refs/heads/branch-1 61288f843 -> 15ed2e86e
HBASE-20855 PeerConfigTracker only supporting one listener will cause problem when there is a recovered replication queue Signed-off-by: tedyu <yuzhih...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/15ed2e86 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/15ed2e86 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/15ed2e86 Branch: refs/heads/branch-1 Commit: 15ed2e86e143612cc99af98a40d2905892c7ec81 Parents: 61288f8 Author: jingyuntian <tianjy1...@gmail.com> Authored: Thu Jul 19 11:51:54 2018 +0800 Committer: tedyu <yuzhih...@gmail.com> Committed: Thu Jul 19 08:43:23 2018 -0700 ---------------------------------------------------------------------- .../hbase/replication/ReplicationPeer.java | 6 ++ .../replication/ReplicationPeerZKImpl.java | 32 +++++- .../replication/BaseReplicationEndpoint.java | 6 ++ .../replication/HBaseReplicationEndpoint.java | 1 + .../HBaseInterClusterReplicationEndpoint.java | 1 + .../regionserver/ReplicationSource.java | 4 + .../TestReplicationConfigTracker.java | 100 +++++++++++++++++++ 7 files changed, 145 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/15ed2e86/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 200d81c..a0e758f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -81,4 +81,10 @@ public interface ReplicationPeer { * @param listener Listener for config changes, usually a replication endpoint */ void trackPeerConfigChanges(ReplicationPeerConfigListener listener); + + /** + * Remove a listener when it is closed or terminated + * @param listener Listener for config changes, usually a replication endpoint + */ + void removeListenerOfPeerConfig(ReplicationPeerConfigListener listener); } http://git-wip-us.apache.org/repos/asf/hbase/blob/15ed2e86/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index b79a982..57b118d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.replication; import java.io.Closeable; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -172,11 +174,22 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements Rep @Override public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { if (this.peerConfigTracker != null){ - this.peerConfigTracker.setListener(listener); + this.peerConfigTracker.addListener(listener); } } @Override + public void removeListenerOfPeerConfig(ReplicationPeerConfigListener listener) { + if (this.peerConfigTracker != null){ + this.peerConfigTracker.removeListener(listener); + } + } + + PeerConfigTracker getPeerConfigTracker() { + return this.peerConfigTracker; + } + + @Override public void abort(String why, Throwable e) { LOG.fatal("The ReplicationPeer corresponding to peer " + peerConfig + " was aborted for the following reason(s):" + why, e); @@ -275,15 +288,24 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements Rep */ public class PeerConfigTracker extends ZooKeeperNodeTracker { - private ReplicationPeerConfigListener listener; + private Set<ReplicationPeerConfigListener> listeners; public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher, Abortable abortable) { super(watcher, peerConfigNode, abortable); + listeners = new HashSet<>(); + } + + public synchronized void addListener(ReplicationPeerConfigListener listener){ + listeners.add(listener); + } + + Set<ReplicationPeerConfigListener> getListeners(){ + return this.listeners; } - public synchronized void setListener(ReplicationPeerConfigListener listener){ - this.listener = listener; + public synchronized void removeListener(ReplicationPeerConfigListener listenerToRemove) { + listeners.remove(listenerToRemove); } @Override @@ -291,7 +313,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements Rep if (path.equals(node)) { super.nodeCreated(path); ReplicationPeerConfig config = readPeerConfig(); - if (listener != null){ + for (ReplicationPeerConfigListener listener : listeners) { listener.peerConfigUpdated(config); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/15ed2e86/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index 71a222a..3f6b8ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -111,4 +111,10 @@ public abstract class BaseReplicationEndpoint extends AbstractService return false; } + public void close(){ + if(this.ctx != null) { + ReplicationPeer peer = this.ctx.getReplicationPeer(); + peer.removeListenerOfPeerConfig(this); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/15ed2e86/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 6d3e70e..a9358a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -89,6 +89,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint @Override protected void doStop() { disconnect(); + close(); notifyStopped(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/15ed2e86/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 2ce195c..6c3fc99 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -446,6 +446,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi "Aborting to prevent Replication from deadlocking. See HBASE-16081."; abortable.abort(errMsg, new IOException(errMsg)); } + close(); notifyStopped(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/15ed2e86/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 1f91789..2396655 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -579,6 +579,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } if (allOtherTaskDone) { manager.closeRecoveredQueue(this.source); + // stop replication endpoint + if (source instanceof ReplicationSource) { + ((ReplicationSource) source).replicationEndpoint.stop(); + } LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: " + getStats()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/15ed2e86/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationConfigTracker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationConfigTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationConfigTracker.java new file mode 100644 index 0000000..01cfde0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationConfigTracker.java @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestReplicationConfigTracker extends TestReplicationBase { + private static final Log LOG = LogFactory.getLog(TestReplicationKillRS.class); + + @Test + public void testReplicationConfigTracker() throws Exception { + // killing the RS with hbase:meta can result into failed puts until we solve + // IO fencing + int rsToKill1 = utility1.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0; + int otherRs = rsToKill1 == 0 ? 1 : 0; + final HRegionServer regionServer = utility1.getHBaseCluster().getRegionServer(otherRs); + final Thread listenerTracker = trackListener(utility1, otherRs); + LOG.info("Start loading table"); + utility1.loadTable(htable1, famName, true); + LOG.info("Done loading table"); + utility1.getHBaseCluster().getRegionServer(rsToKill1).abort("Stopping as part of the test"); + utility1.getHBaseCluster().waitOnRegionServer(rsToKill1); + while (utility1.getHBaseCluster().getMaster().getServerManager().areDeadServersInProgress()) { + LOG.info("Waiting on processing of crashed server before proceeding..."); + Threads.sleep(1000); + } + Waiter.waitFor(utility1.getConfiguration(), 20000, new Waiter.Predicate<Exception>() { + @Override public boolean evaluate() throws Exception { + return !listenerTracker.isAlive(); + } + }); + final ReplicationPeerZKImpl.PeerConfigTracker tracker = getPeerConfigTracker(regionServer); + Waiter.waitFor(utility1.getConfiguration(), 20000, new Waiter.Predicate<Exception>() { + @Override public boolean evaluate() throws Exception { + return tracker.getListeners().size() == 1; + } + }); + } + + private static Thread trackListener(final HBaseTestingUtility utility, final int rs) { + Thread trackListener = new Thread() { + public void run() { + Replication replication = (Replication) utility.getHBaseCluster().getRegionServer(rs) + .getReplicationSourceService(); + ReplicationSourceManager manager = replication.getReplicationManager(); + ReplicationPeerZKImpl replicationPeerZK = + (ReplicationPeerZKImpl) manager.getReplicationPeers().getPeer(PEER_ID); + ReplicationPeerZKImpl.PeerConfigTracker peerConfigTracker = + replicationPeerZK.getPeerConfigTracker(); + while (peerConfigTracker.getListeners().size() != 2) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + LOG.error("track config failed", e); + } + } + } + }; + trackListener.setDaemon(true); + trackListener.start(); + return trackListener; + } + + private ReplicationPeerZKImpl.PeerConfigTracker getPeerConfigTracker(HRegionServer rs) { + Replication replication = (Replication) rs.getReplicationSourceService(); + ReplicationSourceManager manager = replication.getReplicationManager(); + ReplicationPeerZKImpl replicationPeerZK = + (ReplicationPeerZKImpl) manager.getReplicationPeers().getPeer(PEER_ID); + ReplicationPeerZKImpl.PeerConfigTracker peerConfigTracker = + replicationPeerZK.getPeerConfigTracker(); + return peerConfigTracker; + } +} \ No newline at end of file