Repository: hbase
Updated Branches:
  refs/heads/branch-1 61288f843 -> 15ed2e86e


HBASE-20855 PeerConfigTracker only supporting one listener will cause problem 
when there is a recovered replication queue

Signed-off-by: tedyu <yuzhih...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/15ed2e86
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/15ed2e86
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/15ed2e86

Branch: refs/heads/branch-1
Commit: 15ed2e86e143612cc99af98a40d2905892c7ec81
Parents: 61288f8
Author: jingyuntian <tianjy1...@gmail.com>
Authored: Thu Jul 19 11:51:54 2018 +0800
Committer: tedyu <yuzhih...@gmail.com>
Committed: Thu Jul 19 08:43:23 2018 -0700

----------------------------------------------------------------------
 .../hbase/replication/ReplicationPeer.java      |   6 ++
 .../replication/ReplicationPeerZKImpl.java      |  32 +++++-
 .../replication/BaseReplicationEndpoint.java    |   6 ++
 .../replication/HBaseReplicationEndpoint.java   |   1 +
 .../HBaseInterClusterReplicationEndpoint.java   |   1 +
 .../regionserver/ReplicationSource.java         |   4 +
 .../TestReplicationConfigTracker.java           | 100 +++++++++++++++++++
 7 files changed, 145 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/15ed2e86/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index 200d81c..a0e758f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -81,4 +81,10 @@ public interface ReplicationPeer {
    * @param listener Listener for config changes, usually a replication 
endpoint
    */
   void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
+
+  /**
+   * Remove a listener when it is closed or terminated
+   * @param listener Listener for config changes, usually a replication 
endpoint
+   */
+  void removeListenerOfPeerConfig(ReplicationPeerConfigListener listener);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/15ed2e86/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
index b79a982..57b118d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.replication;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -172,11 +174,22 @@ public class ReplicationPeerZKImpl extends 
ReplicationStateZKBase implements Rep
   @Override
   public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
     if (this.peerConfigTracker != null){
-      this.peerConfigTracker.setListener(listener);
+      this.peerConfigTracker.addListener(listener);
     }
   }
 
   @Override
+  public void removeListenerOfPeerConfig(ReplicationPeerConfigListener 
listener) {
+    if (this.peerConfigTracker != null){
+      this.peerConfigTracker.removeListener(listener);
+    }
+  }
+
+  PeerConfigTracker getPeerConfigTracker() {
+    return this.peerConfigTracker;
+  }
+
+  @Override
   public void abort(String why, Throwable e) {
     LOG.fatal("The ReplicationPeer corresponding to peer " + peerConfig
         + " was aborted for the following reason(s):" + why, e);
@@ -275,15 +288,24 @@ public class ReplicationPeerZKImpl extends 
ReplicationStateZKBase implements Rep
    */
   public class PeerConfigTracker extends ZooKeeperNodeTracker {
 
-    private ReplicationPeerConfigListener listener;
+    private Set<ReplicationPeerConfigListener> listeners;
 
     public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher,
                              Abortable abortable) {
       super(watcher, peerConfigNode, abortable);
+      listeners = new HashSet<>();
+    }
+
+    public synchronized void addListener(ReplicationPeerConfigListener 
listener){
+      listeners.add(listener);
+    }
+
+    Set<ReplicationPeerConfigListener> getListeners(){
+      return this.listeners;
     }
 
-    public synchronized void setListener(ReplicationPeerConfigListener 
listener){
-      this.listener = listener;
+    public synchronized void removeListener(ReplicationPeerConfigListener 
listenerToRemove) {
+      listeners.remove(listenerToRemove);
     }
 
     @Override
@@ -291,7 +313,7 @@ public class ReplicationPeerZKImpl extends 
ReplicationStateZKBase implements Rep
       if (path.equals(node)) {
         super.nodeCreated(path);
         ReplicationPeerConfig config = readPeerConfig();
-        if (listener != null){
+        for (ReplicationPeerConfigListener listener : listeners) {
           listener.peerConfigUpdated(config);
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/15ed2e86/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
index 71a222a..3f6b8ef 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
@@ -111,4 +111,10 @@ public abstract class BaseReplicationEndpoint extends 
AbstractService
     return false;
   }
 
+  public void close(){
+    if(this.ctx != null) {
+      ReplicationPeer peer = this.ctx.getReplicationPeer();
+      peer.removeListenerOfPeerConfig(this);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/15ed2e86/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index 6d3e70e..a9358a5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -89,6 +89,7 @@ public abstract class HBaseReplicationEndpoint extends 
BaseReplicationEndpoint
   @Override
   protected void doStop() {
     disconnect();
+    close();
     notifyStopped();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/15ed2e86/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 2ce195c..6c3fc99 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -446,6 +446,7 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
           "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
       abortable.abort(errMsg, new IOException(errMsg));
     }
+    close();
     notifyStopped();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/15ed2e86/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 1f91789..2396655 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -579,6 +579,10 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
           }
           if (allOtherTaskDone) {
             manager.closeRecoveredQueue(this.source);
+            // stop replication endpoint
+            if (source instanceof ReplicationSource) {
+              ((ReplicationSource) source).replicationEndpoint.stop();
+            }
             LOG.info("Finished recovering queue " + peerClusterZnode
                 + " with the following stats: " + getStats());
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/15ed2e86/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationConfigTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationConfigTracker.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationConfigTracker.java
new file mode 100644
index 0000000..01cfde0
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationConfigTracker.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestReplicationConfigTracker extends TestReplicationBase {
+  private static final Log LOG = 
LogFactory.getLog(TestReplicationKillRS.class);
+
+  @Test
+  public void testReplicationConfigTracker() throws Exception {
+    // killing the RS with hbase:meta can result into failed puts until we 
solve
+    // IO fencing
+    int rsToKill1 = utility1.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 
0;
+    int otherRs = rsToKill1 == 0 ? 1 : 0;
+    final HRegionServer regionServer = 
utility1.getHBaseCluster().getRegionServer(otherRs);
+    final Thread listenerTracker = trackListener(utility1, otherRs);
+    LOG.info("Start loading table");
+    utility1.loadTable(htable1, famName, true);
+    LOG.info("Done loading table");
+    utility1.getHBaseCluster().getRegionServer(rsToKill1).abort("Stopping as 
part of the test");
+    utility1.getHBaseCluster().waitOnRegionServer(rsToKill1);
+    while 
(utility1.getHBaseCluster().getMaster().getServerManager().areDeadServersInProgress())
 {
+      LOG.info("Waiting on processing of crashed server before proceeding...");
+      Threads.sleep(1000);
+    }
+    Waiter.waitFor(utility1.getConfiguration(), 20000, new 
Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() throws Exception {
+        return !listenerTracker.isAlive();
+      }
+    });
+    final ReplicationPeerZKImpl.PeerConfigTracker tracker = 
getPeerConfigTracker(regionServer);
+    Waiter.waitFor(utility1.getConfiguration(), 20000, new 
Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() throws Exception {
+        return tracker.getListeners().size() == 1;
+      }
+    });
+  }
+
+  private static Thread trackListener(final HBaseTestingUtility utility, final 
int rs) {
+    Thread trackListener = new Thread() {
+      public void run() {
+        Replication replication = (Replication) 
utility.getHBaseCluster().getRegionServer(rs)
+            .getReplicationSourceService();
+        ReplicationSourceManager manager = replication.getReplicationManager();
+        ReplicationPeerZKImpl replicationPeerZK =
+            (ReplicationPeerZKImpl) 
manager.getReplicationPeers().getPeer(PEER_ID);
+        ReplicationPeerZKImpl.PeerConfigTracker peerConfigTracker =
+            replicationPeerZK.getPeerConfigTracker();
+        while (peerConfigTracker.getListeners().size() != 2) {
+          try {
+            Thread.sleep(50);
+          } catch (InterruptedException e) {
+            LOG.error("track config failed", e);
+          }
+        }
+      }
+    };
+    trackListener.setDaemon(true);
+    trackListener.start();
+    return trackListener;
+  }
+
+  private ReplicationPeerZKImpl.PeerConfigTracker 
getPeerConfigTracker(HRegionServer rs) {
+    Replication replication = (Replication) rs.getReplicationSourceService();
+    ReplicationSourceManager manager = replication.getReplicationManager();
+    ReplicationPeerZKImpl replicationPeerZK =
+        (ReplicationPeerZKImpl) manager.getReplicationPeers().getPeer(PEER_ID);
+    ReplicationPeerZKImpl.PeerConfigTracker peerConfigTracker =
+        replicationPeerZK.getPeerConfigTracker();
+    return peerConfigTracker;
+  }
+}
\ No newline at end of file

Reply via email to