This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new 50149ce  HBASE-22261 Make use of ClusterStatusListener for async client
50149ce is described below

commit 50149ce7a7a17a14e43d2a947646c9743e63bb04
Author: zhangduo <[email protected]>
AuthorDate: Sun Apr 21 12:03:11 2019 +0800

    HBASE-22261 Make use of ClusterStatusListener for async client
---
 .../hadoop/hbase/client/AsyncConnectionImpl.java   | 32 +++++++++
 .../hbase/client/AsyncMetaRegionLocator.java       | 20 ++++++
 .../hbase/client/AsyncNonMetaRegionLocator.java    | 19 +++++
 .../hadoop/hbase/client/AsyncRegionLocator.java    | 21 +++++-
 .../hbase/client/TestAsyncTableRSCrashPublish.java | 84 ++++++++++++++++++++++
 5 files changed, 173 insertions(+), 3 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 7d59984..f58dfba 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED;
+import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT;
+import static 
org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS;
+import static 
org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
 import static 
org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
@@ -112,6 +116,8 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   private final Optional<MetricsConnection> metrics;
 
+  private final ClusterStatusListener clusterStatusListener;
+
   public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, 
String clusterId,
       User user) {
     this.conf = conf;
@@ -140,6 +146,31 @@ class AsyncConnectionImpl implements AsyncConnection {
     }
     this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
     this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
+    ClusterStatusListener listener = null;
+    if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
+      // TODO: this maybe a blocking operation, better to create it outside 
the constructor and pass
+      // it in, just like clusterId. Not a big problem for now as the default 
value is false.
+      Class<? extends ClusterStatusListener.Listener> listenerClass = 
conf.getClass(
+        STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, 
ClusterStatusListener.Listener.class);
+      if (listenerClass == null) {
+        LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, 
STATUS_LISTENER_CLASS);
+      } else {
+        try {
+          listener = new ClusterStatusListener(
+            new ClusterStatusListener.DeadServerHandler() {
+              @Override
+              public void newDead(ServerName sn) {
+                locator.clearCache(sn);
+                rpcClient.cancelConnections(sn);
+              }
+            }, conf, listenerClass);
+        } catch (IOException e) {
+          LOG.warn("Failed to create ClusterStatusListener, not a critical 
problem, ignoring...",
+            e);
+        }
+      }
+    }
+    this.clusterStatusListener = listener;
   }
 
   private void spawnRenewalChore(final UserGroupInformation user) {
@@ -159,6 +190,7 @@ class AsyncConnectionImpl implements AsyncConnection {
     if (closed) {
       return;
     }
+    IOUtils.closeQuietly(clusterStatusListener);
     IOUtils.closeQuietly(rpcClient);
     IOUtils.closeQuietly(registry);
     if (authService != null) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
index 54bf9ff..9cf8bc6 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -113,4 +114,23 @@ class AsyncMetaRegionLocator {
   void clearCache() {
     metaRegionLocations.set(null);
   }
+
+  void clearCache(ServerName serverName) {
+    for (;;) {
+      RegionLocations locs = metaRegionLocations.get();
+      if (locs == null) {
+        return;
+      }
+      RegionLocations newLocs = locs.removeByServer(serverName);
+      if (locs == newLocs) {
+        return;
+      }
+      if (newLocs.isEmpty()) {
+        newLocs = null;
+      }
+      if (metaRegionLocations.compareAndSet(locs, newLocs)) {
+        return;
+      }
+    }
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index bbb84d0..398655f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Scan.ReadType;
@@ -613,6 +614,24 @@ class AsyncNonMetaRegionLocator {
     cache.clear();
   }
 
+  void clearCache(ServerName serverName) {
+    for (TableCache tableCache : cache.values()) {
+      for (Map.Entry<byte[], RegionLocations> entry : 
tableCache.cache.entrySet()) {
+        byte[] regionName = entry.getKey();
+        RegionLocations locs = entry.getValue();
+        RegionLocations newLocs = locs.removeByServer(serverName);
+        if (locs == newLocs) {
+          continue;
+        }
+        if (newLocs.isEmpty()) {
+          tableCache.cache.remove(regionName, locs);
+        } else {
+          tableCache.cache.replace(regionName, locs, newLocs);
+        }
+      }
+    }
+  }
+
   // only used for testing whether we have cached the location for a region.
   @VisibleForTesting
   RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 3eb44b7..9e1d5e8 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -33,6 +34,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 import org.apache.hbase.thirdparty.io.netty.util.Timeout;
 
@@ -46,11 +48,14 @@ class AsyncRegionLocator {
 
   private final HashedWheelTimer retryTimer;
 
+  private final AsyncConnectionImpl conn;
+
   private final AsyncMetaRegionLocator metaRegionLocator;
 
   private final AsyncNonMetaRegionLocator nonMetaRegionLocator;
 
   AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
+    this.conn = conn;
     this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry);
     this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn);
     this.retryTimer = retryTimer;
@@ -150,9 +155,7 @@ class AsyncRegionLocator {
   }
 
   void clearCache(TableName tableName) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Clear meta cache for " + tableName);
-    }
+    LOG.debug("Clear meta cache for {}", tableName);
     if (tableName.equals(META_TABLE_NAME)) {
       metaRegionLocator.clearCache();
     } else {
@@ -160,8 +163,20 @@ class AsyncRegionLocator {
     }
   }
 
+  void clearCache(ServerName serverName) {
+    LOG.debug("Clear meta cache for {}", serverName);
+    metaRegionLocator.clearCache(serverName);
+    nonMetaRegionLocator.clearCache(serverName);
+    
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
+  }
+
   void clearCache() {
     metaRegionLocator.clearCache();
     nonMetaRegionLocator.clearCache();
   }
+
+  @VisibleForTesting
+  AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
+    return nonMetaRegionLocator;
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
new file mode 100644
index 0000000..849feb8
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableRSCrashPublish {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncTableRSCrashPublish.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static AsyncConnectionImpl CONN;
+
+  private static TableName TABLE_NAME = TableName.valueOf("Publish");
+
+  private static byte[] FAMILY = Bytes.toBytes("family");
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
+    UTIL.startMiniCluster(2);
+    UTIL.createTable(TABLE_NAME, FAMILY);
+    UTIL.waitTableAvailable(TABLE_NAME);
+    CONN =
+      (AsyncConnectionImpl) 
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    Closeables.close(CONN, true);
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws IOException {
+    AsyncNonMetaRegionLocator locator = 
CONN.getLocator().getNonMetaRegionLocator();
+    CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
+    ServerName serverName = locator.getRegionLocationInCache(TABLE_NAME, 
HConstants.EMPTY_START_ROW)
+      .getDefaultRegionLocation().getServerName();
+    UTIL.getMiniHBaseCluster().stopRegionServer(serverName);
+    UTIL.waitFor(60000,
+      () -> locator.getRegionLocationInCache(TABLE_NAME, 
HConstants.EMPTY_START_ROW) == null);
+    CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
+    assertNotEquals(serverName,
+      locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW)
+        .getDefaultRegionLocation().getServerName());
+  }
+}

Reply via email to