HBASE-18319 Implement 
getClusterStatus/getRegionLoad/getCompactionState/getLastMajorCompactionTimestamp
 methods


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b0a5fa0c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b0a5fa0c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b0a5fa0c

Branch: refs/heads/HBASE-14070.HLC
Commit: b0a5fa0c2a119168c4272e5efba16a3ef9e9c329
Parents: 4fe7385
Author: Guanghao Zhang <zg...@apache.org>
Authored: Wed Jul 5 18:33:57 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Fri Jul 7 16:21:45 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/client/AsyncAdmin.java  |  95 ++++++++
 .../hadoop/hbase/client/AsyncHBaseAdmin.java    |  44 ++++
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 219 ++++++++++++++++++-
 .../hbase/shaded/protobuf/ProtobufUtil.java     |  11 +-
 .../hbase/shaded/protobuf/RequestConverter.java |  16 +-
 .../hbase/client/TestAsyncClusterAdminApi.java  | 132 +++++++++++
 .../hbase/client/TestAsyncRegionAdminApi.java   |   8 +-
 .../hbase/client/TestAsyncTableAdminApi.java    |  81 ++++++-
 8 files changed, 591 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index ff35d46..8ade209 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Collection;
 import java.util.Map;
@@ -24,8 +25,10 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ProcedureInfo;
+import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
@@ -332,6 +335,11 @@ public interface AsyncAdmin {
   CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName);
 
   /**
+   * Get the regions of a given table.
+   */
+  CompletableFuture<List<HRegionInfo>> getTableRegions(TableName tableName);
+
+  /**
    * Flush a table.
    * @param tableName table to flush
    */
@@ -796,4 +804,91 @@ public interface AsyncAdmin {
    * @return procedure list wrapped by {@link CompletableFuture}
    */
   CompletableFuture<List<ProcedureInfo>> listProcedures();
+
+  /**
+   * @return cluster status wrapped by {@link CompletableFuture}
+   */
+  CompletableFuture<ClusterStatus> getClusterStatus();
+
+  /**
+   * @return current master server name wrapped by {@link CompletableFuture}
+   */
+  default CompletableFuture<ServerName> getMaster() {
+    return getClusterStatus().thenApply(ClusterStatus::getMaster);
+  }
+
+  /**
+   * @return current backup master list wrapped by {@link CompletableFuture}
+   */
+  default CompletableFuture<Collection<ServerName>> getBackupMasters() {
+    return getClusterStatus().thenApply(ClusterStatus::getBackupMasters);
+  }
+
+  /**
+   * @return current live region servers list wrapped by {@link 
CompletableFuture}
+   */
+  default CompletableFuture<Collection<ServerName>> getRegionServers() {
+    return getClusterStatus().thenApply(ClusterStatus::getServers);
+  }
+
+  /**
+   * Get a list of {@link RegionLoad} of all regions hosted on a region 
seerver.
+   * @param serverName
+   * @return a list of {@link RegionLoad} wrapped by {@link CompletableFuture}
+   */
+  default CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName 
serverName) {
+    return getRegionLoads(serverName, Optional.empty());
+  }
+
+  /**
+   * Get a list of {@link RegionLoad} of all regions hosted on a region 
seerver for a table.
+   * @param serverName
+   * @param tableName
+   * @return a list of {@link RegionLoad} wrapped by {@link CompletableFuture}
+   */
+  CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName,
+      Optional<TableName> tableName);
+
+  /**
+   * Check whether master is in maintenance mode
+   * @return true if master is in maintenance mode, false otherwise. The 
return value will be
+   *         wrapped by a {@link CompletableFuture}
+   */
+  CompletableFuture<Boolean> isMasterInMaintenanceMode();
+
+  /**
+   * Get the current compaction state of a table. It could be in a major 
compaction, a minor
+   * compaction, both, or none.
+   * @param tableName table to examine
+   * @return the current compaction state wrapped by a {@link 
CompletableFuture}
+   */
+  CompletableFuture<CompactionState> getCompactionState(TableName tableName);
+
+  /**
+   * Get the current compaction state of region. It could be in a major 
compaction, a minor
+   * compaction, both, or none.
+   * @param regionName region to examine
+   * @return the current compaction state wrapped by a {@link 
CompletableFuture}
+   */
+  CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] 
regionName);
+
+  /**
+   * Get the timestamp of the last major compaction for the passed table.
+   * <p>
+   * The timestamp of the oldest HFile resulting from a major compaction of 
that table, or not
+   * present if no such HFile could be found.
+   * @param tableName table to examine
+   * @return the last major compaction timestamp wrapped by a {@link 
CompletableFuture}
+   */
+  CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName 
tableName);
+
+  /**
+   * Get the timestamp of the last major compaction for the passed region.
+   * <p>
+   * The timestamp of the oldest HFile resulting from a major compaction of 
that region, or not
+   * present if no such HFile could be found.
+   * @param regionName region to examine
+   * @return the last major compaction timestamp wrapped by a {@link 
CompletableFuture}
+   */
+  CompletableFuture<Optional<Long>> 
getLastMajorCompactionTimestampForRegion(byte[] regionName);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 36fd60d..2998133 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -27,8 +27,10 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ProcedureInfo;
+import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
@@ -225,6 +227,11 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
+  public CompletableFuture<List<HRegionInfo>> getTableRegions(TableName 
tableName) {
+    return wrap(rawAdmin.getTableRegions(tableName));
+  }
+
+  @Override
   public CompletableFuture<Void> flush(TableName tableName) {
     return wrap(rawAdmin.flush(tableName));
   }
@@ -445,4 +452,41 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   public CompletableFuture<List<ProcedureInfo>> listProcedures() {
     return wrap(rawAdmin.listProcedures());
   }
+
+  @Override
+  public CompletableFuture<ClusterStatus> getClusterStatus() {
+    return wrap(rawAdmin.getClusterStatus());
+  }
+
+  @Override
+  public CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName 
serverName,
+      Optional<TableName> tableName) {
+    return wrap(rawAdmin.getRegionLoads(serverName, tableName));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
+    return wrap(rawAdmin.isMasterInMaintenanceMode());
+  }
+
+  @Override
+  public CompletableFuture<CompactionState> getCompactionState(TableName 
tableName) {
+    return wrap(rawAdmin.getCompactionState(tableName));
+  }
+
+  @Override
+  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] 
regionName) {
+    return wrap(rawAdmin.getCompactionStateForRegion(regionName));
+  }
+
+  @Override
+  public CompletableFuture<Optional<Long>> 
getLastMajorCompactionTimestamp(TableName tableName) {
+    return wrap(rawAdmin.getLastMajorCompactionTimestamp(tableName));
+  }
+
+  @Override
+  public CompletableFuture<Optional<Long>> 
getLastMajorCompactionTimestampForRegion(
+      byte[] regionName) {
+    return wrap(rawAdmin.getLastMajorCompactionTimestampForRegion(regionName));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 179fd7d..b119754 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -46,12 +46,14 @@ import java.util.stream.Stream;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ProcedureInfo;
+import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -89,10 +91,15 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegion
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
@@ -115,6 +122,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColu
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
@@ -133,6 +142,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
@@ -141,6 +152,9 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamesp
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
@@ -178,7 +192,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Remov
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.*;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
 import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
@@ -728,14 +742,26 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn) {
+  public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName 
serverName) {
     return this.<List<HRegionInfo>> newAdminCaller()
         .action((controller, stub) -> this
             .<GetOnlineRegionRequest, GetOnlineRegionResponse, 
List<HRegionInfo>> adminCall(
               controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
               (s, c, req, done) -> s.getOnlineRegion(c, req, done),
               resp -> ProtobufUtil.getRegionInfos(resp)))
-        .serverName(sn).call();
+        .serverName(serverName).call();
+  }
+
+  @Override
+  public CompletableFuture<List<HRegionInfo>> getTableRegions(TableName 
tableName) {
+    if (tableName.equals(META_TABLE_NAME)) {
+      return connection.getLocator().getRegionLocation(tableName, null, null, 
operationTimeoutNs)
+          .thenApply(loc -> Arrays.asList(loc.getRegionInfo()));
+    } else {
+      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, 
Optional.of(tableName))
+          .thenApply(
+            locs -> locs.stream().map(loc -> 
loc.getRegionInfo()).collect(Collectors.toList()));
+    }
   }
 
   @Override
@@ -2275,4 +2301,189 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
     return false;
   }
-}
+
+  @Override
+  public CompletableFuture<ClusterStatus> getClusterStatus() {
+    return this
+        .<ClusterStatus> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<GetClusterStatusRequest, GetClusterStatusResponse, 
ClusterStatus> call(controller,
+                stub, RequestConverter.buildGetClusterStatusRequest(),
+                (s, c, req, done) -> s.getClusterStatus(c, req, done),
+                resp -> ProtobufUtil.convert(resp.getClusterStatus()))).call();
+  }
+
+  @Override
+  public CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName 
serverName,
+      Optional<TableName> tableName) {
+    return this
+        .<List<RegionLoad>> newAdminCaller()
+        .action(
+          (controller, stub) -> this
+              .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionLoad>> 
adminCall(
+                controller, stub, 
RequestConverter.buildGetRegionLoadRequest(tableName), (s, c,
+                    req, done) -> s.getRegionLoad(controller, req, done),
+                
ProtobufUtil::getRegionLoadInfo)).serverName(serverName).call();
+  }
+
+  @Override
+  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
+    return this
+        .<Boolean> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, 
Boolean> call(controller,
+                stub, IsInMaintenanceModeRequest.newBuilder().build(),
+                (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
+                resp -> resp.getInMaintenanceMode())).call();
+  }
+
+  @Override
+  public CompletableFuture<CompactionState> getCompactionState(TableName 
tableName) {
+    CompletableFuture<CompactionState> future = new CompletableFuture<>();
+    getTableHRegionLocations(tableName).whenComplete(
+      (locations, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        List<CompactionState> regionStates = new ArrayList<>();
+        List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
+        locations.stream().filter(loc -> loc.getServerName() != null)
+            .filter(loc -> loc.getRegionInfo() != null)
+            .filter(loc -> !loc.getRegionInfo().isOffline())
+            .map(loc -> loc.getRegionInfo().getRegionName()).forEach(region -> 
{
+              
futures.add(getCompactionStateForRegion(region).whenComplete((regionState, 
err2) -> {
+                // If any region compaction state is MAJOR_AND_MINOR
+                // the table compaction state is MAJOR_AND_MINOR, too.
+                if (err2 != null) {
+                  future.completeExceptionally(err2);
+                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
+
+                  future.complete(regionState);
+                } else {
+                  regionStates.add(regionState);
+                }
+              }));
+            });
+        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture<?>[futures.size()]))
+            .whenComplete((ret, err3) -> {
+              // If future not completed, check all regions's compaction state
+              if (!future.isCompletedExceptionally() && !future.isDone()) {
+                CompactionState state = CompactionState.NONE;
+                for (CompactionState regionState : regionStates) {
+                  switch (regionState) {
+                  case MAJOR:
+                    if (state == CompactionState.MINOR) {
+                      future.complete(CompactionState.MAJOR_AND_MINOR);
+                    } else {
+                      state = CompactionState.MAJOR;
+                    }
+                    break;
+                  case MINOR:
+                    if (state == CompactionState.MAJOR) {
+                      future.complete(CompactionState.MAJOR_AND_MINOR);
+                    } else {
+                      state = CompactionState.MINOR;
+                    }
+                    break;
+                  case NONE:
+                  default:
+                  }
+                  if (!future.isDone()) {
+                    future.complete(state);
+                  }
+                }
+              }
+            });
+      });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] 
regionName) {
+    CompletableFuture<CompactionState> future = new CompletableFuture<>();
+    getRegionLocation(regionName).whenComplete(
+      (location, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        ServerName serverName = location.getServerName();
+        if (serverName == null) {
+          future.completeExceptionally(new NoServerForRegionException(Bytes
+              .toStringBinary(regionName)));
+          return;
+        }
+        this.<GetRegionInfoResponse> newAdminCaller()
+            .action(
+              (controller, stub) -> this
+                  .<GetRegionInfoRequest, GetRegionInfoResponse, 
GetRegionInfoResponse> adminCall(
+                    controller, stub, 
RequestConverter.buildGetRegionInfoRequest(location
+                        .getRegionInfo().getRegionName(), true), (s, c, req, 
done) -> s
+                        .getRegionInfo(controller, req, done), resp -> resp))
+            .serverName(serverName).call().whenComplete((resp2, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                if (resp2.hasCompactionState()) {
+                  
future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
+                } else {
+                  future.complete(CompactionState.NONE);
+                }
+              }
+            });
+      });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Optional<Long>> 
getLastMajorCompactionTimestamp(TableName tableName) {
+    MajorCompactionTimestampRequest request =
+        MajorCompactionTimestampRequest.newBuilder()
+            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
+    return this
+        .<Optional<Long>> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<MajorCompactionTimestampRequest, 
MajorCompactionTimestampResponse, Optional<Long>> call(
+                controller, stub, request,
+                (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, 
done),
+                ProtobufUtil::toOptionalTimestamp)).call();
+  }
+
+  @Override
+  public CompletableFuture<Optional<Long>> 
getLastMajorCompactionTimestampForRegion(
+      byte[] regionName) {
+    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+    // regionName may be a full region name or encoded region name, so 
getRegionInfo(byte[]) first
+    getRegionInfo(regionName)
+        .whenComplete(
+          (region, err) -> {
+            if (err != null) {
+              future.completeExceptionally(err);
+              return;
+            }
+            MajorCompactionTimestampForRegionRequest.Builder builder =
+                MajorCompactionTimestampForRegionRequest.newBuilder();
+            builder.setRegion(RequestConverter.buildRegionSpecifier(
+              RegionSpecifierType.REGION_NAME, regionName));
+            this.<Optional<Long>> newMasterCaller()
+                .action(
+                  (controller, stub) -> this
+                      .<MajorCompactionTimestampForRegionRequest, 
MajorCompactionTimestampResponse, Optional<Long>> call(
+                        controller, stub, builder.build(), (s, c, req, done) 
-> s
+                            .getLastMajorCompactionTimestampForRegion(c, req, 
done),
+                        ProtobufUtil::toOptionalTimestamp)).call()
+                .whenComplete((timestamp, err2) -> {
+                  if (err2 != null) {
+                    future.completeExceptionally(err2);
+                  } else {
+                    future.complete(timestamp);
+                  }
+                });
+          });
+    return future;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 2bb2994..eebe4bd 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -165,6 +165,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedure;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
@@ -1806,7 +1807,8 @@ public final class ProtobufUtil {
   public static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoad(
       final RpcController controller, final AdminService.BlockingInterface 
admin,
       final TableName tableName) throws IOException {
-    GetRegionLoadRequest request = 
RequestConverter.buildGetRegionLoadRequest(tableName);
+    GetRegionLoadRequest request =
+        
RequestConverter.buildGetRegionLoadRequest(Optional.ofNullable(tableName));
     GetRegionLoadResponse response;
     try {
       response = admin.getRegionLoad(controller, request);
@@ -1816,7 +1818,7 @@ public final class ProtobufUtil {
     return getRegionLoadInfo(response);
   }
 
-  static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoadInfo(
+  public static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoadInfo(
       GetRegionLoadResponse regionLoadResponse) {
     List<org.apache.hadoop.hbase.RegionLoad> regionLoadList =
         new ArrayList<>(regionLoadResponse.getRegionLoadsCount());
@@ -3066,6 +3068,11 @@ public final class ProtobufUtil {
     return CompactionState.valueOf(state.toString());
   }
 
+  public static Optional<Long> 
toOptionalTimestamp(MajorCompactionTimestampResponse resp) {
+    long timestamp = resp.getCompactionTimestamp();
+    return timestamp == 0 ? Optional.empty() : Optional.of(timestamp);
+  }
+
   /**
    * Creates {@link 
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type}
    * from {@link SnapshotType}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index dff9116..a74d737 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -796,15 +796,23 @@ public final class RequestConverter {
 
   /**
    * Create a protocol buffer GetRegionLoadRequest for all regions/regions of 
a table.
-   *
    * @param tableName the table for which regionLoad should be obtained from RS
    * @return a protocol buffer GetRegionLoadRequest
+   * @deprecated use {@link #buildGetRegionLoadRequest(Optional)} instead.
    */
+  @Deprecated
   public static GetRegionLoadRequest buildGetRegionLoadRequest(final TableName 
tableName) {
+    return buildGetRegionLoadRequest(Optional.ofNullable(tableName));
+  }
+
+  /**
+   * Create a protocol buffer GetRegionLoadRequest for all regions/regions of 
a table.
+   * @param tableName the table for which regionLoad should be obtained from RS
+   * @return a protocol buffer GetRegionLoadRequest
+   */
+  public static GetRegionLoadRequest 
buildGetRegionLoadRequest(Optional<TableName> tableName) {
     GetRegionLoadRequest.Builder builder = GetRegionLoadRequest.newBuilder();
-    if (tableName != null) {
-      builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
-    }
+    tableName.ifPresent(table -> 
builder.setTableName(ProtobufUtil.toProtoTableName(table)));
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java
new file mode 100644
index 0000000..e8f6380
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+@Category({ MiscTests.class, MediumTests.class })
+public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
+
+  @Test
+  public void testRegionLoad() throws Exception {
+    // Turn off the balancer
+    admin.setBalancerOn(false).join();
+    TableName[] tables =
+        new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"),
+            TableName.valueOf(tableName.getNameAsString() + "2"),
+            TableName.valueOf(tableName.getNameAsString() + "3") };
+    createAndLoadTable(tables);
+    // Check if regions match with the regionLoad from the server
+    Collection<ServerName> servers = admin.getRegionServers().get();
+    for (ServerName serverName : servers) {
+      List<HRegionInfo> regions = admin.getOnlineRegions(serverName).get();
+      checkRegionsAndRegionLoads(regions, 
admin.getRegionLoads(serverName).get());
+    }
+
+    // Check if regionLoad matches the table's regions and nothing is missed
+    for (TableName table : tables) {
+      List<HRegionInfo> tableRegions = admin.getTableRegions(table).get();
+      List<RegionLoad> regionLoads = Lists.newArrayList();
+      for (ServerName serverName : servers) {
+        regionLoads.addAll(admin.getRegionLoads(serverName, 
Optional.of(table)).get());
+      }
+      checkRegionsAndRegionLoads(tableRegions, regionLoads);
+    }
+
+    // Check RegionLoad matches the regionLoad from ClusterStatus
+    ClusterStatus clusterStatus = admin.getClusterStatus().get();
+    for (ServerName serverName : clusterStatus.getServers()) {
+      ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+      compareRegionLoads(serverLoad.getRegionsLoad().values(), 
admin.getRegionLoads(serverName)
+          .get());
+    }
+  }
+
+  private void compareRegionLoads(Collection<RegionLoad> regionLoadCluster,
+      Collection<RegionLoad> regionLoads) {
+
+    assertEquals("No of regionLoads from clusterStatus and regionloads from RS 
doesn't match",
+      regionLoadCluster.size(), regionLoads.size());
+
+    for (RegionLoad loadCluster : regionLoadCluster) {
+      boolean matched = false;
+      for (RegionLoad load : regionLoads) {
+        if (Bytes.equals(loadCluster.getName(), load.getName())) {
+          matched = true;
+          continue;
+        }
+      }
+      assertTrue("The contents of region load from cluster and server should 
match", matched);
+    }
+  }
+
+  private void checkRegionsAndRegionLoads(Collection<HRegionInfo> regions,
+      Collection<RegionLoad> regionLoads) {
+
+    assertEquals("No of regions and regionloads doesn't match", 
regions.size(), regionLoads.size());
+
+    Map<byte[], RegionLoad> regionLoadMap = 
Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+    for (RegionLoad regionLoad : regionLoads) {
+      regionLoadMap.put(regionLoad.getName(), regionLoad);
+    }
+    for (HRegionInfo info : regions) {
+      assertTrue("Region not in regionLoadMap region:" + 
info.getRegionNameAsString()
+          + " regionMap: " + regionLoadMap, 
regionLoadMap.containsKey(info.getRegionName()));
+    }
+  }
+
+  private void createAndLoadTable(TableName[] tables) {
+    for (TableName table : tables) {
+      TableDescriptorBuilder builder = 
TableDescriptorBuilder.newBuilder(table);
+      
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
+      admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), 
Bytes.toBytes("zzzzz"), 16).join();
+      RawAsyncTable asyncTable = ASYNC_CONN.getRawTable(table);
+      List<Put> puts = new ArrayList<>();
+      for (byte[] row : HBaseTestingUtility.ROWS) {
+        puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), 
Bytes.toBytes("v")));
+      }
+      asyncTable.putAll(puts).join();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
index 7c8b236..7752d37 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
@@ -515,10 +515,10 @@ public class TestAsyncRegionAdminApi extends 
TestAsyncAdminBase {
     long curt = System.currentTimeMillis();
     long waitTime = 5000;
     long endt = curt + waitTime;
-    CompactionState state = TEST_UTIL.getAdmin().getCompactionState(tableName);
+    CompactionState state = admin.getCompactionState(tableName).get();
     while (state == CompactionState.NONE && curt < endt) {
       Thread.sleep(10);
-      state = TEST_UTIL.getAdmin().getCompactionState(tableName);
+      state = admin.getCompactionState(tableName).get();
       curt = System.currentTimeMillis();
     }
     // Now, should have the right compaction state,
@@ -530,10 +530,10 @@ public class TestAsyncRegionAdminApi extends 
TestAsyncAdminBase {
       }
     } else {
       // Wait until the compaction is done
-      state = TEST_UTIL.getAdmin().getCompactionState(tableName);
+      state = admin.getCompactionState(tableName).get();
       while (state != CompactionState.NONE && curt < endt) {
         Thread.sleep(10);
-        state = TEST_UTIL.getAdmin().getCompactionState(tableName);
+        state = admin.getCompactionState(tableName).get();
       }
       // Now, compaction should be done.
       assertEquals(CompactionState.NONE, state);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
index f75c346..f2db244 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
@@ -38,6 +38,7 @@ import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -774,4 +775,82 @@ public class TestAsyncTableAdminApi extends 
TestAsyncAdminBase {
     boolean tableAvailable = admin.isTableAvailable(tableName, 
splitKeys).get();
     assertFalse("Table should be created with 1 row in META", tableAvailable);
   }
-}
+
+  @Test
+  public void testCompactionTimestamps() throws Exception {
+    createTableWithDefaultConf(tableName);
+    RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
+    Optional<Long> ts = admin.getLastMajorCompactionTimestamp(tableName).get();
+    assertFalse(ts.isPresent());
+    Put p = new Put(Bytes.toBytes("row1"));
+    p.addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"));
+    table.put(p).join();
+    ts = admin.getLastMajorCompactionTimestamp(tableName).get();
+    // no files written -> no data
+    assertFalse(ts.isPresent());
+
+    admin.flush(tableName).join();
+    ts = admin.getLastMajorCompactionTimestamp(tableName).get();
+    // still 0, we flushed a file, but no major compaction happened
+    assertFalse(ts.isPresent());
+
+    byte[] regionName =
+        
ASYNC_CONN.getRegionLocator(tableName).getRegionLocation(Bytes.toBytes("row1")).get()
+            .getRegionInfo().getRegionName();
+    Optional<Long> ts1 = 
admin.getLastMajorCompactionTimestampForRegion(regionName).get();
+    assertFalse(ts1.isPresent());
+    p = new Put(Bytes.toBytes("row2"));
+    p.addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"));
+    table.put(p).join();
+    admin.flush(tableName).join();
+    ts1 = admin.getLastMajorCompactionTimestamp(tableName).get();
+    // make sure the region API returns the same value, as the old file is 
still around
+    assertFalse(ts1.isPresent());
+
+    for (int i = 0; i < 3; i++) {
+      table.put(p).join();
+      admin.flush(tableName).join();
+    }
+    admin.majorCompact(tableName).join();
+    long curt = System.currentTimeMillis();
+    long waitTime = 10000;
+    long endt = curt + waitTime;
+    CompactionState state = admin.getCompactionState(tableName).get();
+    LOG.info("Current compaction state 1 is " + state);
+    while (state == CompactionState.NONE && curt < endt) {
+      Thread.sleep(100);
+      state = admin.getCompactionState(tableName).get();
+      curt = System.currentTimeMillis();
+      LOG.info("Current compaction state 2 is " + state);
+    }
+    // Now, should have the right compaction state, let's wait until the 
compaction is done
+    if (state == CompactionState.MAJOR) {
+      state = admin.getCompactionState(tableName).get();
+      LOG.info("Current compaction state 3 is " + state);
+      while (state != CompactionState.NONE && curt < endt) {
+        Thread.sleep(10);
+        state = admin.getCompactionState(tableName).get();
+        LOG.info("Current compaction state 4 is " + state);
+      }
+    }
+    // Sleep to wait region server report
+    
Thread.sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval",
 3 * 1000) * 2);
+
+    ts = admin.getLastMajorCompactionTimestamp(tableName).get();
+    // after a compaction our earliest timestamp will have progressed forward
+    assertTrue(ts.isPresent());
+    assertTrue(ts.get() > 0);
+    // region api still the same
+    ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get();
+    assertTrue(ts1.isPresent());
+    assertEquals(ts.get(), ts1.get());
+    table.put(p).join();
+    admin.flush(tableName).join();
+    ts = admin.getLastMajorCompactionTimestamp(tableName).join();
+    assertTrue(ts.isPresent());
+    assertEquals(ts.get(), ts1.get());
+    ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get();
+    assertTrue(ts1.isPresent());
+    assertEquals(ts.get(), ts1.get());
+  }
+}
\ No newline at end of file

Reply via email to