Repository: hbase
Updated Branches:
  refs/heads/master 9ec0ec492 -> af9d359b8


HBASE-17402 TestAsyncTableScan sometimes hangs


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/af9d359b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/af9d359b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/af9d359b

Branch: refs/heads/master
Commit: af9d359b8eb7933bd20f1dcaa2ddb1b27059d23e
Parents: 9ec0ec4
Author: zhangduo <zhang...@apache.org>
Authored: Tue Feb 7 09:31:38 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Tue Feb 7 09:42:42 2017 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncNonMetaRegionLocator.java | 56 ++++++++++----------
 .../client/TestAsyncNonMetaRegionLocator.java   | 36 +++++++++++--
 2 files changed, 60 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/af9d359b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 27e8cc4..dcf2c91 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -140,8 +139,8 @@ class AsyncNonMetaRegionLocator {
       return;
     }
     tableCache.cache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, 
oldLoc) -> {
-      if (oldLoc.getSeqNum() > loc.getSeqNum()
-          || !oldLoc.getServerName().equals(loc.getServerName())) {
+      if (oldLoc.getSeqNum() > loc.getSeqNum() ||
+          !oldLoc.getServerName().equals(loc.getServerName())) {
         return oldLoc;
       }
       return null;
@@ -158,11 +157,11 @@ class AsyncNonMetaRegionLocator {
     if (oldLoc == null) {
       return true;
     }
-    if (oldLoc.getSeqNum() > loc.getSeqNum()
-        || oldLoc.getServerName().equals(loc.getServerName())) {
+    if (oldLoc.getSeqNum() > loc.getSeqNum() ||
+        oldLoc.getServerName().equals(loc.getServerName())) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Will not add " + loc + " to cache because the old value " + 
oldLoc
-            + " is newer than us or has the same server name");
+        LOG.trace("Will not add " + loc + " to cache because the old value " + 
oldLoc +
+            " is newer than us or has the same server name");
       }
       return false;
     }
@@ -171,9 +170,9 @@ class AsyncNonMetaRegionLocator {
         return loc;
       }
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Will not add " + loc + " to cache because the old value " + 
oldValue
-            + " is newer than us or has the same server name."
-            + " Maybe it is updated before we replace it");
+        LOG.trace("Will not add " + loc + " to cache because the old value " + 
oldValue +
+            " is newer than us or has the same server name." +
+            " Maybe it is updated before we replace it");
       }
       return oldValue;
     });
@@ -217,8 +216,8 @@ class AsyncNonMetaRegionLocator {
       Throwable error) {
     if (error != null) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Failed to locate region in '" + tableName + "', row='"
-            + Bytes.toStringBinary(req.row) + "', locateType=" + 
req.locateType,
+        LOG.debug("Failed to locate region in '" + tableName + "', row='" +
+            Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType,
           error);
       }
     }
@@ -250,14 +249,15 @@ class AsyncNonMetaRegionLocator {
           }
         }
       }
-      if (!tableCache.allRequests.isEmpty()
-          && tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) {
+      if (!tableCache.allRequests.isEmpty() &&
+          tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) {
         LocateRequest[] candidates = tableCache.allRequests.keySet().stream()
             .filter(r -> 
!tableCache.isPending(r)).toArray(LocateRequest[]::new);
         if (candidates.length > 0) {
           // TODO: use a better algorithm to send a request which is more 
likely to fetch a new
           // location.
           toSend = 
candidates[ThreadLocalRandom.current().nextInt(candidates.length)];
+          tableCache.send(toSend);
         }
       }
     }
@@ -278,8 +278,8 @@ class AsyncNonMetaRegionLocator {
     }
     RegionLocations locs = 
MetaTableAccessor.getRegionLocations(results.get(0));
     if (LOG.isDebugEnabled()) {
-      LOG.debug("The fetched location of '" + tableName + "', row='" + 
Bytes.toStringBinary(req.row)
-          + "', locateType=" + req.locateType + " is " + locs);
+      LOG.debug("The fetched location of '" + tableName + "', row='" +
+          Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType + 
" is " + locs);
     }
     if (locs == null || locs.getDefaultRegionLocation() == null) {
       complete(tableName, req, null,
@@ -303,13 +303,13 @@ class AsyncNonMetaRegionLocator {
     if (info.isSplit()) {
       complete(tableName, req, null,
         new RegionOfflineException(
-            "the only available region for the required row is a split parent,"
-                + " the daughters should be online soon: '" + 
info.getRegionNameAsString() + "'"));
+            "the only available region for the required row is a split 
parent," +
+                " the daughters should be online soon: '" + 
info.getRegionNameAsString() + "'"));
       return;
     }
     if (info.isOffline()) {
-      complete(tableName, req, null, new RegionOfflineException("the region is 
offline, could"
-          + " be caused by a disable table call: '" + 
info.getRegionNameAsString() + "'"));
+      complete(tableName, req, null, new RegionOfflineException("the region is 
offline, could" +
+          " be caused by a disable table call: '" + 
info.getRegionNameAsString() + "'"));
       return;
     }
     if (loc.getServerName() == null) {
@@ -331,8 +331,8 @@ class AsyncNonMetaRegionLocator {
     byte[] endKey = loc.getRegionInfo().getEndKey();
     if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
-            + Bytes.toStringBinary(row) + "', locateType=" + 
RegionLocateType.CURRENT);
+        LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
+            Bytes.toStringBinary(row) + "', locateType=" + 
RegionLocateType.CURRENT);
       }
       return loc;
     } else {
@@ -348,11 +348,11 @@ class AsyncNonMetaRegionLocator {
       return null;
     }
     HRegionLocation loc = entry.getValue();
-    if (isEmptyStopRow(loc.getRegionInfo().getEndKey())
-        || Bytes.compareTo(loc.getRegionInfo().getEndKey(), row) >= 0) {
+    if (isEmptyStopRow(loc.getRegionInfo().getEndKey()) ||
+        Bytes.compareTo(loc.getRegionInfo().getEndKey(), row) >= 0) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
-            + Bytes.toStringBinary(row) + "', locateType=" + 
RegionLocateType.BEFORE);
+        LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
+            Bytes.toStringBinary(row) + "', locateType=" + 
RegionLocateType.BEFORE);
       }
       return loc;
     } else {
@@ -362,8 +362,8 @@ class AsyncNonMetaRegionLocator {
 
   private void locateInMeta(TableName tableName, LocateRequest req) {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Try locate '" + tableName + "', row='" + 
Bytes.toStringBinary(req.row)
-          + "', locateType=" + req.locateType + " in meta");
+      LOG.trace("Try locate '" + tableName + "', row='" + 
Bytes.toStringBinary(req.row) +
+          "', locateType=" + req.locateType + " in meta");
     }
     byte[] metaKey;
     if (req.locateType.equals(RegionLocateType.BEFORE)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/af9d359b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index 40fca72..fa1346a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static java.util.stream.Collectors.toList;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -27,6 +28,8 @@ import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.IntStream;
@@ -166,10 +169,7 @@ public class TestAsyncNonMetaRegionLocator {
     return endKeys;
   }
 
-  @Test
-  public void testMultiRegionTable() throws IOException, InterruptedException {
-    createMultiRegionTable();
-    byte[][] startKeys = getStartKeys();
+  private ServerName[] getLocations(byte[][] startKeys) {
     ServerName[] serverNames = new ServerName[startKeys.length];
     TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> 
t.getRegionServer())
         .forEach(rs -> {
@@ -178,6 +178,14 @@ public class TestAsyncNonMetaRegionLocator {
               Bytes::compareTo)] = rs.getServerName();
           });
         });
+    return serverNames;
+  }
+
+  @Test
+  public void testMultiRegionTable() throws IOException, InterruptedException {
+    createMultiRegionTable();
+    byte[][] startKeys = getStartKeys();
+    ServerName[] serverNames = getLocations(startKeys);
     IntStream.range(0, 2).forEach(n -> IntStream.range(0, 
startKeys.length).forEach(i -> {
       try {
         assertLocEquals(startKeys[i], i == startKeys.length - 1 ? 
EMPTY_END_ROW : startKeys[i + 1],
@@ -264,4 +272,24 @@ public class TestAsyncNonMetaRegionLocator {
 
     assertSame(afterLoc, LOCATOR.getRegionLocation(TABLE_NAME, row, 
RegionLocateType.AFTER).get());
   }
+
+  // For HBASE-17402
+  @Test
+  public void testConcurrentLocate() throws IOException, InterruptedException, 
ExecutionException {
+    createMultiRegionTable();
+    byte[][] startKeys = getStartKeys();
+    byte[][] endKeys = getEndKeys();
+    ServerName[] serverNames = getLocations(startKeys);
+    for (int i = 0; i < 100; i++) {
+      LOCATOR.clearCache(TABLE_NAME);
+      List<CompletableFuture<HRegionLocation>> futures = IntStream.range(0, 
1000)
+          .mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
+          .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, 
RegionLocateType.CURRENT))
+          .collect(toList());
+      for (int j = 0; j < 1000; j++) {
+        int index = Math.min(8, j / 111);
+        assertLocEquals(startKeys[index], endKeys[index], serverNames[index], 
futures.get(j).get());
+      }
+    }
+  }
 }

Reply via email to