Repository: hbase Updated Branches: refs/heads/master 9ec0ec492 -> af9d359b8
HBASE-17402 TestAsyncTableScan sometimes hangs Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/af9d359b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/af9d359b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/af9d359b Branch: refs/heads/master Commit: af9d359b8eb7933bd20f1dcaa2ddb1b27059d23e Parents: 9ec0ec4 Author: zhangduo <zhang...@apache.org> Authored: Tue Feb 7 09:31:38 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Tue Feb 7 09:42:42 2017 +0800 ---------------------------------------------------------------------- .../hbase/client/AsyncNonMetaRegionLocator.java | 56 ++++++++++---------- .../client/TestAsyncNonMetaRegionLocator.java | 36 +++++++++++-- 2 files changed, 60 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/af9d359b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index 27e8cc4..dcf2c91 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.util.Bytes; /** @@ -140,8 +139,8 @@ class AsyncNonMetaRegionLocator { return; } tableCache.cache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> { - if (oldLoc.getSeqNum() > loc.getSeqNum() - || !oldLoc.getServerName().equals(loc.getServerName())) { + if (oldLoc.getSeqNum() > loc.getSeqNum() || + !oldLoc.getServerName().equals(loc.getServerName())) { return oldLoc; } return null; @@ -158,11 +157,11 @@ class AsyncNonMetaRegionLocator { if (oldLoc == null) { return true; } - if (oldLoc.getSeqNum() > loc.getSeqNum() - || oldLoc.getServerName().equals(loc.getServerName())) { + if (oldLoc.getSeqNum() > loc.getSeqNum() || + oldLoc.getServerName().equals(loc.getServerName())) { if (LOG.isTraceEnabled()) { - LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc - + " is newer than us or has the same server name"); + LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc + + " is newer than us or has the same server name"); } return false; } @@ -171,9 +170,9 @@ class AsyncNonMetaRegionLocator { return loc; } if (LOG.isTraceEnabled()) { - LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue - + " is newer than us or has the same server name." - + " Maybe it is updated before we replace it"); + LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue + + " is newer than us or has the same server name." + + " Maybe it is updated before we replace it"); } return oldValue; }); @@ -217,8 +216,8 @@ class AsyncNonMetaRegionLocator { Throwable error) { if (error != null) { if (LOG.isDebugEnabled()) { - LOG.debug("Failed to locate region in '" + tableName + "', row='" - + Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, + LOG.debug("Failed to locate region in '" + tableName + "', row='" + + Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error); } } @@ -250,14 +249,15 @@ class AsyncNonMetaRegionLocator { } } } - if (!tableCache.allRequests.isEmpty() - && tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) { + if (!tableCache.allRequests.isEmpty() && + tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) { LocateRequest[] candidates = tableCache.allRequests.keySet().stream() .filter(r -> !tableCache.isPending(r)).toArray(LocateRequest[]::new); if (candidates.length > 0) { // TODO: use a better algorithm to send a request which is more likely to fetch a new // location. toSend = candidates[ThreadLocalRandom.current().nextInt(candidates.length)]; + tableCache.send(toSend); } } } @@ -278,8 +278,8 @@ class AsyncNonMetaRegionLocator { } RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0)); if (LOG.isDebugEnabled()) { - LOG.debug("The fetched location of '" + tableName + "', row='" + Bytes.toStringBinary(req.row) - + "', locateType=" + req.locateType + " is " + locs); + LOG.debug("The fetched location of '" + tableName + "', row='" + + Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType + " is " + locs); } if (locs == null || locs.getDefaultRegionLocation() == null) { complete(tableName, req, null, @@ -303,13 +303,13 @@ class AsyncNonMetaRegionLocator { if (info.isSplit()) { complete(tableName, req, null, new RegionOfflineException( - "the only available region for the required row is a split parent," - + " the daughters should be online soon: '" + info.getRegionNameAsString() + "'")); + "the only available region for the required row is a split parent," + + " the daughters should be online soon: '" + info.getRegionNameAsString() + "'")); return; } if (info.isOffline()) { - complete(tableName, req, null, new RegionOfflineException("the region is offline, could" - + " be caused by a disable table call: '" + info.getRegionNameAsString() + "'")); + complete(tableName, req, null, new RegionOfflineException("the region is offline, could" + + " be caused by a disable table call: '" + info.getRegionNameAsString() + "'")); return; } if (loc.getServerName() == null) { @@ -331,8 +331,8 @@ class AsyncNonMetaRegionLocator { byte[] endKey = loc.getRegionInfo().getEndKey(); if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { if (LOG.isTraceEnabled()) { - LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" - + Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT); + LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" + + Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT); } return loc; } else { @@ -348,11 +348,11 @@ class AsyncNonMetaRegionLocator { return null; } HRegionLocation loc = entry.getValue(); - if (isEmptyStopRow(loc.getRegionInfo().getEndKey()) - || Bytes.compareTo(loc.getRegionInfo().getEndKey(), row) >= 0) { + if (isEmptyStopRow(loc.getRegionInfo().getEndKey()) || + Bytes.compareTo(loc.getRegionInfo().getEndKey(), row) >= 0) { if (LOG.isTraceEnabled()) { - LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" - + Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE); + LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" + + Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE); } return loc; } else { @@ -362,8 +362,8 @@ class AsyncNonMetaRegionLocator { private void locateInMeta(TableName tableName, LocateRequest req) { if (LOG.isTraceEnabled()) { - LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) - + "', locateType=" + req.locateType + " in meta"); + LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) + + "', locateType=" + req.locateType + " in meta"); } byte[] metaKey; if (req.locateType.equals(RegionLocateType.BEFORE)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/af9d359b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java index 40fca72..fa1346a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; import static org.hamcrest.CoreMatchers.instanceOf; @@ -27,6 +28,8 @@ import static org.junit.Assert.assertThat; import java.io.IOException; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.IntStream; @@ -166,10 +169,7 @@ public class TestAsyncNonMetaRegionLocator { return endKeys; } - @Test - public void testMultiRegionTable() throws IOException, InterruptedException { - createMultiRegionTable(); - byte[][] startKeys = getStartKeys(); + private ServerName[] getLocations(byte[][] startKeys) { ServerName[] serverNames = new ServerName[startKeys.length]; TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) .forEach(rs -> { @@ -178,6 +178,14 @@ public class TestAsyncNonMetaRegionLocator { Bytes::compareTo)] = rs.getServerName(); }); }); + return serverNames; + } + + @Test + public void testMultiRegionTable() throws IOException, InterruptedException { + createMultiRegionTable(); + byte[][] startKeys = getStartKeys(); + ServerName[] serverNames = getLocations(startKeys); IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> { try { assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1], @@ -264,4 +272,24 @@ public class TestAsyncNonMetaRegionLocator { assertSame(afterLoc, LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER).get()); } + + // For HBASE-17402 + @Test + public void testConcurrentLocate() throws IOException, InterruptedException, ExecutionException { + createMultiRegionTable(); + byte[][] startKeys = getStartKeys(); + byte[][] endKeys = getEndKeys(); + ServerName[] serverNames = getLocations(startKeys); + for (int i = 0; i < 100; i++) { + LOCATOR.clearCache(TABLE_NAME); + List<CompletableFuture<HRegionLocation>> futures = IntStream.range(0, 1000) + .mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s)) + .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT)) + .collect(toList()); + for (int j = 0; j < 1000; j++) { + int index = Math.min(8, j / 111); + assertLocEquals(startKeys[index], endKeys[index], serverNames[index], futures.get(j).get()); + } + } + } }