HBASE-20597 Use a lock to serialize access to a shared reference to ZooKeeperWatcher in HBaseReplicationEndpoint
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/60dcef28 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/60dcef28 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/60dcef28 Branch: refs/heads/branch-2 Commit: 60dcef289b12e1650d715691eccdfa18481b432f Parents: 12d7572 Author: Andrew Purtell <apurt...@apache.org> Authored: Thu May 17 10:30:28 2018 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Wed May 23 16:46:20 2018 -0700 ---------------------------------------------------------------------- .../replication/HBaseReplicationEndpoint.java | 43 ++++++++++++-------- 1 file changed, 27 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/60dcef28/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index bd5c529..8286f7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -43,21 +43,22 @@ import org.slf4j.LoggerFactory; * target cluster is an HBase cluster. */ @InterfaceAudience.Private -@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS", - justification="Thinks zkw needs to be synchronized access but should be fine as is.") public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint implements Abortable { private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class); - private ZKWatcher zkw = null; // FindBugs: MT_CORRECTNESS + private Object zkwLock = new Object(); + private ZKWatcher zkw = null; private List<ServerName> regionServers = new ArrayList<>(0); private long lastRegionServerUpdate; protected void disconnect() { - if (zkw != null) { - zkw.close(); + synchronized (zkwLock) { + if (zkw != null) { + zkw.close(); + } } } @@ -112,7 +113,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint public synchronized UUID getPeerUUID() { UUID peerUUID = null; try { - peerUUID = ZKClusterId.getUUIDForCluster(zkw); + synchronized (zkwLock) { + peerUUID = ZKClusterId.getUUIDForCluster(zkw); + } } catch (KeeperException ke) { reconnect(ke); } @@ -124,7 +127,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint * @return zk connection */ protected ZKWatcher getZkw() { - return zkw; + synchronized (zkwLock) { + return zkw; + } } /** @@ -132,10 +137,14 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint * @throws IOException If anything goes wrong connecting */ void reloadZkWatcher() throws IOException { - if (zkw != null) zkw.close(); - zkw = new ZKWatcher(ctx.getConfiguration(), + synchronized (zkwLock) { + if (zkw != null) { + zkw.close(); + } + zkw = new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this); - getZkw().registerListener(new PeerRegionServerListener(this)); + zkw.registerListener(new PeerRegionServerListener(this)); + } } @Override @@ -173,13 +182,15 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint * for this peer cluster * @return list of addresses */ - // Synchronize peer cluster connection attempts to avoid races and rate - // limit connections when multiple replication sources try to connect to - // the peer cluster. If the peer cluster is down we can get out of control - // over time. - public synchronized List<ServerName> getRegionServers() { + public List<ServerName> getRegionServers() { try { - setRegionServers(fetchSlavesAddresses(this.getZkw())); + // Synchronize peer cluster connection attempts to avoid races and rate + // limit connections when multiple replication sources try to connect to + // the peer cluster. If the peer cluster is down we can get out of control + // over time. + synchronized (zkwLock) { + setRegionServers(fetchSlavesAddresses(zkw)); + } } catch (KeeperException ke) { if (LOG.isDebugEnabled()) { LOG.debug("Fetch slaves addresses failed", ke);