HBASE-18569 Add prefetch support for async region locator
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4caf2fb0 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4caf2fb0 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4caf2fb0 Branch: refs/heads/branch-2.0 Commit: 4caf2fb0d848821d96b98dc449589eae0124b2b7 Parents: 40d2787 Author: zhangduo <zhang...@apache.org> Authored: Fri Jun 22 08:48:33 2018 +0800 Committer: Duo Zhang <zhang...@apache.org> Committed: Thu Jan 3 10:55:06 2019 +0800 ---------------------------------------------------------------------- .../hbase/client/AsyncNonMetaRegionLocator.java | 75 +++++++++++++++--- .../client/TestAsyncTableLocatePrefetch.java | 82 ++++++++++++++++++++ 2 files changed, 145 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/4caf2fb0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index f6d74a5..7e3d56c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -52,6 +52,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + /** * The asynchronous locator for regions other than meta. */ @@ -60,15 +62,23 @@ class AsyncNonMetaRegionLocator { private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class); + @VisibleForTesting static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = "hbase.client.meta.max.concurrent.locate.per.table"; private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8; + @VisibleForTesting + static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit"; + + private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10; + private final AsyncConnectionImpl conn; private final int maxConcurrentLocateRequestPerTable; + private final int locatePrefetchLimit; + private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>(); private static final class LocateRequest { @@ -168,6 +178,8 @@ class AsyncNonMetaRegionLocator { this.conn = conn; this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt( MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE); + this.locatePrefetchLimit = + conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT); } private TableCache getTableCache(TableName tableName) { @@ -223,9 +235,7 @@ class AsyncNonMetaRegionLocator { justification = "Called by lambda expression") private void addToCache(HRegionLocation loc) { addToCache(getTableCache(loc.getRegion().getTable()), loc); - if (LOG.isTraceEnabled()) { - LOG.trace("Try adding " + loc + " to cache"); - } + LOG.trace("Try adding {} to cache", loc); } private void complete(TableName tableName, LocateRequest req, HRegionLocation loc, @@ -271,8 +281,10 @@ class AsyncNonMetaRegionLocator { // return whether we should stop the scan private boolean onScanNext(TableName tableName, LocateRequest req, Result result) { RegionLocations locs = MetaTableAccessor.getRegionLocations(result); - LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName, - Bytes.toStringBinary(req.row), req.locateType, locs); + if (LOG.isDebugEnabled()) { + LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName, + Bytes.toStringBinary(req.row), req.locateType, locs); + } if (locs == null || locs.getDefaultRegionLocation() == null) { complete(tableName, req, null, @@ -294,8 +306,8 @@ class AsyncNonMetaRegionLocator { if (loc.getServerName() == null) { complete(tableName, req, null, new IOException( - String.format("No server address listed for region '%s', row='%s', locateType=%s", - info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType))); + String.format("No server address listed for region '%s', row='%s', locateType=%s", + info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType))); return true; } complete(tableName, req, loc, null); @@ -361,7 +373,7 @@ class AsyncNonMetaRegionLocator { RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); conn.getTable(META_TABLE_NAME) .scan(new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true) - .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5) + .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit) .setReadType(ReadType.PREAD), new AdvancedScanResultConsumer() { private boolean completeNormally = false; @@ -385,12 +397,41 @@ class AsyncNonMetaRegionLocator { @Override public void onNext(Result[] results, ScanController controller) { - for (Result result : results) { - tableNotFound = false; - if (onScanNext(tableName, req, result)) { + if (results.length == 0) { + return; + } + tableNotFound = false; + int i = 0; + for (; i < results.length; i++) { + if (onScanNext(tableName, req, results[i])) { completeNormally = true; controller.terminate(); - return; + i++; + break; + } + } + // Add the remaining results into cache + if (i < results.length) { + TableCache tableCache = getTableCache(tableName); + for (; i < results.length; i++) { + RegionLocations locs = MetaTableAccessor.getRegionLocations(results[i]); + if (locs == null) { + continue; + } + HRegionLocation loc = locs.getDefaultRegionLocation(); + if (loc == null) { + continue; + } + RegionInfo info = loc.getRegion(); + if (info == null || info.isOffline() || info.isSplitParent() || + loc.getServerName() == null) { + continue; + } + if (addToCache(tableCache, loc)) { + synchronized (tableCache) { + tableCache.clearCompletedRequests(Optional.of(loc)); + } + } } } } @@ -482,4 +523,14 @@ class AsyncNonMetaRegionLocator { } } } + + // only used for testing whether we have cached the location for a region. + @VisibleForTesting + HRegionLocation getRegionLocationInCache(TableName tableName, byte[] row) { + TableCache tableCache = cache.get(tableName); + if (tableCache == null) { + return null; + } + return locateRowInCache(tableCache, tableName, row); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4caf2fb0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java new file mode 100644 index 0000000..13d8000 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertNotNull; + +import java.util.concurrent.ExecutionException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableLocatePrefetch { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncTableLocatePrefetch.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static AsyncConnection CONN; + + private static AsyncNonMetaRegionLocator LOCATOR; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt(AsyncNonMetaRegionLocator.LOCATE_PREFETCH_LIMIT, 100); + TEST_UTIL.startMiniCluster(3); + TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + LOCATOR = new AsyncNonMetaRegionLocator((AsyncConnectionImpl) CONN); + } + + @AfterClass + public static void tearDown() throws Exception { + Closeables.close(CONN, true); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws InterruptedException, ExecutionException { + assertNotNull(LOCATOR + .getRegionLocation(TABLE_NAME, Bytes.toBytes("zzz"), RegionLocateType.CURRENT, false).get()); + // we finish the request before we adding the remaining results to cache so sleep a bit here + Thread.sleep(1000); + // confirm that the locations of all the regions have been cached. + assertNotNull(LOCATOR.getRegionLocationInCache(TABLE_NAME, Bytes.toBytes("aaa"))); + for (byte[] row : HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE) { + assertNotNull(LOCATOR.getRegionLocationInCache(TABLE_NAME, row)); + } + } +}