This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new eb912bb HBASE-22261 Make use of ClusterStatusListener for async client
eb912bb is described below
commit eb912bb1dab793248f47d6ed8ee6f8abb467d10b
Author: zhangduo <[email protected]>
AuthorDate: Sun Apr 21 12:03:11 2019 +0800
HBASE-22261 Make use of ClusterStatusListener for async client
---
.../hadoop/hbase/client/AsyncConnectionImpl.java | 32 +++++++++
.../hbase/client/AsyncMetaRegionLocator.java | 20 ++++++
.../hbase/client/AsyncNonMetaRegionLocator.java | 19 +++++
.../hadoop/hbase/client/AsyncRegionLocator.java | 21 +++++-
.../hbase/client/TestAsyncTableRSCrashPublish.java | 84 ++++++++++++++++++++++
5 files changed, 173 insertions(+), 3 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 7d59984..f58dfba 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED;
+import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT;
+import static
org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS;
+import static
org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS;
import static
org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static
org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
@@ -112,6 +116,8 @@ class AsyncConnectionImpl implements AsyncConnection {
private final Optional<MetricsConnection> metrics;
+ private final ClusterStatusListener clusterStatusListener;
+
public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry,
String clusterId,
User user) {
this.conf = conf;
@@ -140,6 +146,31 @@ class AsyncConnectionImpl implements AsyncConnection {
}
this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
+ ClusterStatusListener listener = null;
+ if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
+ // TODO: this maybe a blocking operation, better to create it outside
the constructor and pass
+ // it in, just like clusterId. Not a big problem for now as the default
value is false.
+ Class<? extends ClusterStatusListener.Listener> listenerClass =
conf.getClass(
+ STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS,
ClusterStatusListener.Listener.class);
+ if (listenerClass == null) {
+ LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED,
STATUS_LISTENER_CLASS);
+ } else {
+ try {
+ listener = new ClusterStatusListener(
+ new ClusterStatusListener.DeadServerHandler() {
+ @Override
+ public void newDead(ServerName sn) {
+ locator.clearCache(sn);
+ rpcClient.cancelConnections(sn);
+ }
+ }, conf, listenerClass);
+ } catch (IOException e) {
+ LOG.warn("Failed to create ClusterStatusListener, not a critical
problem, ignoring...",
+ e);
+ }
+ }
+ }
+ this.clusterStatusListener = listener;
}
private void spawnRenewalChore(final UserGroupInformation user) {
@@ -159,6 +190,7 @@ class AsyncConnectionImpl implements AsyncConnection {
if (closed) {
return;
}
+ IOUtils.closeQuietly(clusterStatusListener);
IOUtils.closeQuietly(rpcClient);
IOUtils.closeQuietly(registry);
if (authService != null) {
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
index 54bf9ff..9cf8bc6 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -113,4 +114,23 @@ class AsyncMetaRegionLocator {
void clearCache() {
metaRegionLocations.set(null);
}
+
+ void clearCache(ServerName serverName) {
+ for (;;) {
+ RegionLocations locs = metaRegionLocations.get();
+ if (locs == null) {
+ return;
+ }
+ RegionLocations newLocs = locs.removeByServer(serverName);
+ if (locs == newLocs) {
+ return;
+ }
+ if (newLocs.isEmpty()) {
+ newLocs = null;
+ }
+ if (metaRegionLocations.compareAndSet(locs, newLocs)) {
+ return;
+ }
+ }
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 069a324..fd8fcdb 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Scan.ReadType;
@@ -617,6 +618,24 @@ class AsyncNonMetaRegionLocator {
cache.clear();
}
+ void clearCache(ServerName serverName) {
+ for (TableCache tableCache : cache.values()) {
+ for (Map.Entry<byte[], RegionLocations> entry :
tableCache.cache.entrySet()) {
+ byte[] regionName = entry.getKey();
+ RegionLocations locs = entry.getValue();
+ RegionLocations newLocs = locs.removeByServer(serverName);
+ if (locs == newLocs) {
+ continue;
+ }
+ if (newLocs.isEmpty()) {
+ tableCache.cache.remove(regionName, locs);
+ } else {
+ tableCache.cache.replace(regionName, locs, newLocs);
+ }
+ }
+ }
+ }
+
// only used for testing whether we have cached the location for a region.
@VisibleForTesting
RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 3eb44b7..9e1d5e8 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.util.Bytes;
@@ -33,6 +34,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
@@ -46,11 +48,14 @@ class AsyncRegionLocator {
private final HashedWheelTimer retryTimer;
+ private final AsyncConnectionImpl conn;
+
private final AsyncMetaRegionLocator metaRegionLocator;
private final AsyncNonMetaRegionLocator nonMetaRegionLocator;
AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
+ this.conn = conn;
this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry);
this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn);
this.retryTimer = retryTimer;
@@ -150,9 +155,7 @@ class AsyncRegionLocator {
}
void clearCache(TableName tableName) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Clear meta cache for " + tableName);
- }
+ LOG.debug("Clear meta cache for {}", tableName);
if (tableName.equals(META_TABLE_NAME)) {
metaRegionLocator.clearCache();
} else {
@@ -160,8 +163,20 @@ class AsyncRegionLocator {
}
}
+ void clearCache(ServerName serverName) {
+ LOG.debug("Clear meta cache for {}", serverName);
+ metaRegionLocator.clearCache(serverName);
+ nonMetaRegionLocator.clearCache(serverName);
+
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
+ }
+
void clearCache() {
metaRegionLocator.clearCache();
nonMetaRegionLocator.clearCache();
}
+
+ @VisibleForTesting
+ AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
+ return nonMetaRegionLocator;
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
new file mode 100644
index 0000000..849feb8
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableRSCrashPublish {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncTableRSCrashPublish.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static AsyncConnectionImpl CONN;
+
+ private static TableName TABLE_NAME = TableName.valueOf("Publish");
+
+ private static byte[] FAMILY = Bytes.toBytes("family");
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
+ UTIL.startMiniCluster(2);
+ UTIL.createTable(TABLE_NAME, FAMILY);
+ UTIL.waitTableAvailable(TABLE_NAME);
+ CONN =
+ (AsyncConnectionImpl)
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ Closeables.close(CONN, true);
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void test() throws IOException {
+ AsyncNonMetaRegionLocator locator =
CONN.getLocator().getNonMetaRegionLocator();
+ CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
+ ServerName serverName = locator.getRegionLocationInCache(TABLE_NAME,
HConstants.EMPTY_START_ROW)
+ .getDefaultRegionLocation().getServerName();
+ UTIL.getMiniHBaseCluster().stopRegionServer(serverName);
+ UTIL.waitFor(60000,
+ () -> locator.getRegionLocationInCache(TABLE_NAME,
HConstants.EMPTY_START_ROW) == null);
+ CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
+ assertNotEquals(serverName,
+ locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW)
+ .getDefaultRegionLocation().getServerName());
+ }
+}