HBASE-18319 Implement getClusterStatus/getRegionLoad/getCompactionState/getLastMajorCompactionTimestamp methods
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b0a5fa0c Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b0a5fa0c Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b0a5fa0c Branch: refs/heads/HBASE-14070.HLC Commit: b0a5fa0c2a119168c4272e5efba16a3ef9e9c329 Parents: 4fe7385 Author: Guanghao Zhang <zg...@apache.org> Authored: Wed Jul 5 18:33:57 2017 +0800 Committer: Guanghao Zhang <zg...@apache.org> Committed: Fri Jul 7 16:21:45 2017 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/client/AsyncAdmin.java | 95 ++++++++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 44 ++++ .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 219 ++++++++++++++++++- .../hbase/shaded/protobuf/ProtobufUtil.java | 11 +- .../hbase/shaded/protobuf/RequestConverter.java | 16 +- .../hbase/client/TestAsyncClusterAdminApi.java | 132 +++++++++++ .../hbase/client/TestAsyncRegionAdminApi.java | 8 +- .../hbase/client/TestAsyncTableAdminApi.java | 81 ++++++- 8 files changed, 591 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index ff35d46..8ade209 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import java.io.IOException; import java.util.List; import java.util.Collection; import java.util.Map; @@ -24,8 +25,10 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; +import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ProcedureInfo; +import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; @@ -332,6 +335,11 @@ public interface AsyncAdmin { CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName); /** + * Get the regions of a given table. + */ + CompletableFuture<List<HRegionInfo>> getTableRegions(TableName tableName); + + /** * Flush a table. * @param tableName table to flush */ @@ -796,4 +804,91 @@ public interface AsyncAdmin { * @return procedure list wrapped by {@link CompletableFuture} */ CompletableFuture<List<ProcedureInfo>> listProcedures(); + + /** + * @return cluster status wrapped by {@link CompletableFuture} + */ + CompletableFuture<ClusterStatus> getClusterStatus(); + + /** + * @return current master server name wrapped by {@link CompletableFuture} + */ + default CompletableFuture<ServerName> getMaster() { + return getClusterStatus().thenApply(ClusterStatus::getMaster); + } + + /** + * @return current backup master list wrapped by {@link CompletableFuture} + */ + default CompletableFuture<Collection<ServerName>> getBackupMasters() { + return getClusterStatus().thenApply(ClusterStatus::getBackupMasters); + } + + /** + * @return current live region servers list wrapped by {@link CompletableFuture} + */ + default CompletableFuture<Collection<ServerName>> getRegionServers() { + return getClusterStatus().thenApply(ClusterStatus::getServers); + } + + /** + * Get a list of {@link RegionLoad} of all regions hosted on a region seerver. + * @param serverName + * @return a list of {@link RegionLoad} wrapped by {@link CompletableFuture} + */ + default CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName) { + return getRegionLoads(serverName, Optional.empty()); + } + + /** + * Get a list of {@link RegionLoad} of all regions hosted on a region seerver for a table. + * @param serverName + * @param tableName + * @return a list of {@link RegionLoad} wrapped by {@link CompletableFuture} + */ + CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName, + Optional<TableName> tableName); + + /** + * Check whether master is in maintenance mode + * @return true if master is in maintenance mode, false otherwise. The return value will be + * wrapped by a {@link CompletableFuture} + */ + CompletableFuture<Boolean> isMasterInMaintenanceMode(); + + /** + * Get the current compaction state of a table. It could be in a major compaction, a minor + * compaction, both, or none. + * @param tableName table to examine + * @return the current compaction state wrapped by a {@link CompletableFuture} + */ + CompletableFuture<CompactionState> getCompactionState(TableName tableName); + + /** + * Get the current compaction state of region. It could be in a major compaction, a minor + * compaction, both, or none. + * @param regionName region to examine + * @return the current compaction state wrapped by a {@link CompletableFuture} + */ + CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName); + + /** + * Get the timestamp of the last major compaction for the passed table. + * <p> + * The timestamp of the oldest HFile resulting from a major compaction of that table, or not + * present if no such HFile could be found. + * @param tableName table to examine + * @return the last major compaction timestamp wrapped by a {@link CompletableFuture} + */ + CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName); + + /** + * Get the timestamp of the last major compaction for the passed region. + * <p> + * The timestamp of the oldest HFile resulting from a major compaction of that region, or not + * present if no such HFile could be found. + * @param regionName region to examine + * @return the last major compaction timestamp wrapped by a {@link CompletableFuture} + */ + CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(byte[] regionName); } http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 36fd60d..2998133 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -27,8 +27,10 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ProcedureInfo; +import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; @@ -225,6 +227,11 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override + public CompletableFuture<List<HRegionInfo>> getTableRegions(TableName tableName) { + return wrap(rawAdmin.getTableRegions(tableName)); + } + + @Override public CompletableFuture<Void> flush(TableName tableName) { return wrap(rawAdmin.flush(tableName)); } @@ -445,4 +452,41 @@ public class AsyncHBaseAdmin implements AsyncAdmin { public CompletableFuture<List<ProcedureInfo>> listProcedures() { return wrap(rawAdmin.listProcedures()); } + + @Override + public CompletableFuture<ClusterStatus> getClusterStatus() { + return wrap(rawAdmin.getClusterStatus()); + } + + @Override + public CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName, + Optional<TableName> tableName) { + return wrap(rawAdmin.getRegionLoads(serverName, tableName)); + } + + @Override + public CompletableFuture<Boolean> isMasterInMaintenanceMode() { + return wrap(rawAdmin.isMasterInMaintenanceMode()); + } + + @Override + public CompletableFuture<CompactionState> getCompactionState(TableName tableName) { + return wrap(rawAdmin.getCompactionState(tableName)); + } + + @Override + public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { + return wrap(rawAdmin.getCompactionStateForRegion(regionName)); + } + + @Override + public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { + return wrap(rawAdmin.getLastMajorCompactionTimestamp(tableName)); + } + + @Override + public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion( + byte[] regionName) { + return wrap(rawAdmin.getLastMajorCompactionTimestampForRegion(regionName)); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 179fd7d..b119754 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -46,12 +46,14 @@ import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MetaTableAccessor.QueryType; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ProcedureInfo; +import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.NamespaceDescriptor; @@ -89,10 +91,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegion import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; @@ -115,6 +122,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColu import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; @@ -133,6 +142,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; @@ -141,6 +152,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamesp import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; @@ -178,7 +192,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Remov import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.*; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; @@ -728,14 +742,26 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn) { + public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName) { return this.<List<HRegionInfo>> newAdminCaller() .action((controller, stub) -> this .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<HRegionInfo>> adminCall( controller, stub, RequestConverter.buildGetOnlineRegionRequest(), (s, c, req, done) -> s.getOnlineRegion(c, req, done), resp -> ProtobufUtil.getRegionInfos(resp))) - .serverName(sn).call(); + .serverName(serverName).call(); + } + + @Override + public CompletableFuture<List<HRegionInfo>> getTableRegions(TableName tableName) { + if (tableName.equals(META_TABLE_NAME)) { + return connection.getLocator().getRegionLocation(tableName, null, null, operationTimeoutNs) + .thenApply(loc -> Arrays.asList(loc.getRegionInfo())); + } else { + return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)) + .thenApply( + locs -> locs.stream().map(loc -> loc.getRegionInfo()).collect(Collectors.toList())); + } } @Override @@ -2275,4 +2301,189 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { } return false; } -} + + @Override + public CompletableFuture<ClusterStatus> getClusterStatus() { + return this + .<ClusterStatus> newMasterCaller() + .action( + (controller, stub) -> this + .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterStatus> call(controller, + stub, RequestConverter.buildGetClusterStatusRequest(), + (s, c, req, done) -> s.getClusterStatus(c, req, done), + resp -> ProtobufUtil.convert(resp.getClusterStatus()))).call(); + } + + @Override + public CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName, + Optional<TableName> tableName) { + return this + .<List<RegionLoad>> newAdminCaller() + .action( + (controller, stub) -> this + .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionLoad>> adminCall( + controller, stub, RequestConverter.buildGetRegionLoadRequest(tableName), (s, c, + req, done) -> s.getRegionLoad(controller, req, done), + ProtobufUtil::getRegionLoadInfo)).serverName(serverName).call(); + } + + @Override + public CompletableFuture<Boolean> isMasterInMaintenanceMode() { + return this + .<Boolean> newMasterCaller() + .action( + (controller, stub) -> this + .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller, + stub, IsInMaintenanceModeRequest.newBuilder().build(), + (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), + resp -> resp.getInMaintenanceMode())).call(); + } + + @Override + public CompletableFuture<CompactionState> getCompactionState(TableName tableName) { + CompletableFuture<CompactionState> future = new CompletableFuture<>(); + getTableHRegionLocations(tableName).whenComplete( + (locations, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + List<CompactionState> regionStates = new ArrayList<>(); + List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); + locations.stream().filter(loc -> loc.getServerName() != null) + .filter(loc -> loc.getRegionInfo() != null) + .filter(loc -> !loc.getRegionInfo().isOffline()) + .map(loc -> loc.getRegionInfo().getRegionName()).forEach(region -> { + futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { + // If any region compaction state is MAJOR_AND_MINOR + // the table compaction state is MAJOR_AND_MINOR, too. + if (err2 != null) { + future.completeExceptionally(err2); + } else if (regionState == CompactionState.MAJOR_AND_MINOR) { + + future.complete(regionState); + } else { + regionStates.add(regionState); + } + })); + }); + CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])) + .whenComplete((ret, err3) -> { + // If future not completed, check all regions's compaction state + if (!future.isCompletedExceptionally() && !future.isDone()) { + CompactionState state = CompactionState.NONE; + for (CompactionState regionState : regionStates) { + switch (regionState) { + case MAJOR: + if (state == CompactionState.MINOR) { + future.complete(CompactionState.MAJOR_AND_MINOR); + } else { + state = CompactionState.MAJOR; + } + break; + case MINOR: + if (state == CompactionState.MAJOR) { + future.complete(CompactionState.MAJOR_AND_MINOR); + } else { + state = CompactionState.MINOR; + } + break; + case NONE: + default: + } + if (!future.isDone()) { + future.complete(state); + } + } + } + }); + }); + return future; + } + + @Override + public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { + CompletableFuture<CompactionState> future = new CompletableFuture<>(); + getRegionLocation(regionName).whenComplete( + (location, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + ServerName serverName = location.getServerName(); + if (serverName == null) { + future.completeExceptionally(new NoServerForRegionException(Bytes + .toStringBinary(regionName))); + return; + } + this.<GetRegionInfoResponse> newAdminCaller() + .action( + (controller, stub) -> this + .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall( + controller, stub, RequestConverter.buildGetRegionInfoRequest(location + .getRegionInfo().getRegionName(), true), (s, c, req, done) -> s + .getRegionInfo(controller, req, done), resp -> resp)) + .serverName(serverName).call().whenComplete((resp2, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + if (resp2.hasCompactionState()) { + future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); + } else { + future.complete(CompactionState.NONE); + } + } + }); + }); + return future; + } + + @Override + public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { + MajorCompactionTimestampRequest request = + MajorCompactionTimestampRequest.newBuilder() + .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); + return this + .<Optional<Long>> newMasterCaller() + .action( + (controller, stub) -> this + .<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call( + controller, stub, request, + (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), + ProtobufUtil::toOptionalTimestamp)).call(); + } + + @Override + public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion( + byte[] regionName) { + CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); + // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first + getRegionInfo(regionName) + .whenComplete( + (region, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + MajorCompactionTimestampForRegionRequest.Builder builder = + MajorCompactionTimestampForRegionRequest.newBuilder(); + builder.setRegion(RequestConverter.buildRegionSpecifier( + RegionSpecifierType.REGION_NAME, regionName)); + this.<Optional<Long>> newMasterCaller() + .action( + (controller, stub) -> this + .<MajorCompactionTimestampForRegionRequest, MajorCompactionTimestampResponse, Optional<Long>> call( + controller, stub, builder.build(), (s, c, req, done) -> s + .getLastMajorCompactionTimestampForRegion(c, req, done), + ProtobufUtil::toOptionalTimestamp)).call() + .whenComplete((timestamp, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(timestamp); + } + }); + }); + return future; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 2bb2994..eebe4bd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -165,6 +165,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedure; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; @@ -1806,7 +1807,8 @@ public final class ProtobufUtil { public static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoad( final RpcController controller, final AdminService.BlockingInterface admin, final TableName tableName) throws IOException { - GetRegionLoadRequest request = RequestConverter.buildGetRegionLoadRequest(tableName); + GetRegionLoadRequest request = + RequestConverter.buildGetRegionLoadRequest(Optional.ofNullable(tableName)); GetRegionLoadResponse response; try { response = admin.getRegionLoad(controller, request); @@ -1816,7 +1818,7 @@ public final class ProtobufUtil { return getRegionLoadInfo(response); } - static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoadInfo( + public static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoadInfo( GetRegionLoadResponse regionLoadResponse) { List<org.apache.hadoop.hbase.RegionLoad> regionLoadList = new ArrayList<>(regionLoadResponse.getRegionLoadsCount()); @@ -3066,6 +3068,11 @@ public final class ProtobufUtil { return CompactionState.valueOf(state.toString()); } + public static Optional<Long> toOptionalTimestamp(MajorCompactionTimestampResponse resp) { + long timestamp = resp.getCompactionTimestamp(); + return timestamp == 0 ? Optional.empty() : Optional.of(timestamp); + } + /** * Creates {@link org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type} * from {@link SnapshotType} http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index dff9116..a74d737 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -796,15 +796,23 @@ public final class RequestConverter { /** * Create a protocol buffer GetRegionLoadRequest for all regions/regions of a table. - * * @param tableName the table for which regionLoad should be obtained from RS * @return a protocol buffer GetRegionLoadRequest + * @deprecated use {@link #buildGetRegionLoadRequest(Optional)} instead. */ + @Deprecated public static GetRegionLoadRequest buildGetRegionLoadRequest(final TableName tableName) { + return buildGetRegionLoadRequest(Optional.ofNullable(tableName)); + } + + /** + * Create a protocol buffer GetRegionLoadRequest for all regions/regions of a table. + * @param tableName the table for which regionLoad should be obtained from RS + * @return a protocol buffer GetRegionLoadRequest + */ + public static GetRegionLoadRequest buildGetRegionLoadRequest(Optional<TableName> tableName) { GetRegionLoadRequest.Builder builder = GetRegionLoadRequest.newBuilder(); - if (tableName != null) { - builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); - } + tableName.ifPresent(table -> builder.setTableName(ProtobufUtil.toProtoTableName(table))); return builder.build(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java new file mode 100644 index 0000000..e8f6380 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RegionLoad; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +@RunWith(Parameterized.class) +@Category({ MiscTests.class, MediumTests.class }) +public class TestAsyncClusterAdminApi extends TestAsyncAdminBase { + + @Test + public void testRegionLoad() throws Exception { + // Turn off the balancer + admin.setBalancerOn(false).join(); + TableName[] tables = + new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"), + TableName.valueOf(tableName.getNameAsString() + "2"), + TableName.valueOf(tableName.getNameAsString() + "3") }; + createAndLoadTable(tables); + // Check if regions match with the regionLoad from the server + Collection<ServerName> servers = admin.getRegionServers().get(); + for (ServerName serverName : servers) { + List<HRegionInfo> regions = admin.getOnlineRegions(serverName).get(); + checkRegionsAndRegionLoads(regions, admin.getRegionLoads(serverName).get()); + } + + // Check if regionLoad matches the table's regions and nothing is missed + for (TableName table : tables) { + List<HRegionInfo> tableRegions = admin.getTableRegions(table).get(); + List<RegionLoad> regionLoads = Lists.newArrayList(); + for (ServerName serverName : servers) { + regionLoads.addAll(admin.getRegionLoads(serverName, Optional.of(table)).get()); + } + checkRegionsAndRegionLoads(tableRegions, regionLoads); + } + + // Check RegionLoad matches the regionLoad from ClusterStatus + ClusterStatus clusterStatus = admin.getClusterStatus().get(); + for (ServerName serverName : clusterStatus.getServers()) { + ServerLoad serverLoad = clusterStatus.getLoad(serverName); + compareRegionLoads(serverLoad.getRegionsLoad().values(), admin.getRegionLoads(serverName) + .get()); + } + } + + private void compareRegionLoads(Collection<RegionLoad> regionLoadCluster, + Collection<RegionLoad> regionLoads) { + + assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match", + regionLoadCluster.size(), regionLoads.size()); + + for (RegionLoad loadCluster : regionLoadCluster) { + boolean matched = false; + for (RegionLoad load : regionLoads) { + if (Bytes.equals(loadCluster.getName(), load.getName())) { + matched = true; + continue; + } + } + assertTrue("The contents of region load from cluster and server should match", matched); + } + } + + private void checkRegionsAndRegionLoads(Collection<HRegionInfo> regions, + Collection<RegionLoad> regionLoads) { + + assertEquals("No of regions and regionloads doesn't match", regions.size(), regionLoads.size()); + + Map<byte[], RegionLoad> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + for (RegionLoad regionLoad : regionLoads) { + regionLoadMap.put(regionLoad.getName(), regionLoad); + } + for (HRegionInfo info : regions) { + assertTrue("Region not in regionLoadMap region:" + info.getRegionNameAsString() + + " regionMap: " + regionLoadMap, regionLoadMap.containsKey(info.getRegionName())); + } + } + + private void createAndLoadTable(TableName[] tables) { + for (TableName table : tables) { + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table); + builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build()); + admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join(); + RawAsyncTable asyncTable = ASYNC_CONN.getRawTable(table); + List<Put> puts = new ArrayList<>(); + for (byte[] row : HBaseTestingUtility.ROWS) { + puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"))); + } + asyncTable.putAll(puts).join(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java index 7c8b236..7752d37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java @@ -515,10 +515,10 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { long curt = System.currentTimeMillis(); long waitTime = 5000; long endt = curt + waitTime; - CompactionState state = TEST_UTIL.getAdmin().getCompactionState(tableName); + CompactionState state = admin.getCompactionState(tableName).get(); while (state == CompactionState.NONE && curt < endt) { Thread.sleep(10); - state = TEST_UTIL.getAdmin().getCompactionState(tableName); + state = admin.getCompactionState(tableName).get(); curt = System.currentTimeMillis(); } // Now, should have the right compaction state, @@ -530,10 +530,10 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { } } else { // Wait until the compaction is done - state = TEST_UTIL.getAdmin().getCompactionState(tableName); + state = admin.getCompactionState(tableName).get(); while (state != CompactionState.NONE && curt < endt) { Thread.sleep(10); - state = TEST_UTIL.getAdmin().getCompactionState(tableName); + state = admin.getCompactionState(tableName).get(); } // Now, compaction should be done. assertEquals(CompactionState.NONE, state); http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java index f75c346..f2db244 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java @@ -38,6 +38,7 @@ import java.util.regex.Pattern; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.AsyncMetaTableAccessor; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -774,4 +775,82 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { boolean tableAvailable = admin.isTableAvailable(tableName, splitKeys).get(); assertFalse("Table should be created with 1 row in META", tableAvailable); } -} + + @Test + public void testCompactionTimestamps() throws Exception { + createTableWithDefaultConf(tableName); + RawAsyncTable table = ASYNC_CONN.getRawTable(tableName); + Optional<Long> ts = admin.getLastMajorCompactionTimestamp(tableName).get(); + assertFalse(ts.isPresent()); + Put p = new Put(Bytes.toBytes("row1")); + p.addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v")); + table.put(p).join(); + ts = admin.getLastMajorCompactionTimestamp(tableName).get(); + // no files written -> no data + assertFalse(ts.isPresent()); + + admin.flush(tableName).join(); + ts = admin.getLastMajorCompactionTimestamp(tableName).get(); + // still 0, we flushed a file, but no major compaction happened + assertFalse(ts.isPresent()); + + byte[] regionName = + ASYNC_CONN.getRegionLocator(tableName).getRegionLocation(Bytes.toBytes("row1")).get() + .getRegionInfo().getRegionName(); + Optional<Long> ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get(); + assertFalse(ts1.isPresent()); + p = new Put(Bytes.toBytes("row2")); + p.addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v")); + table.put(p).join(); + admin.flush(tableName).join(); + ts1 = admin.getLastMajorCompactionTimestamp(tableName).get(); + // make sure the region API returns the same value, as the old file is still around + assertFalse(ts1.isPresent()); + + for (int i = 0; i < 3; i++) { + table.put(p).join(); + admin.flush(tableName).join(); + } + admin.majorCompact(tableName).join(); + long curt = System.currentTimeMillis(); + long waitTime = 10000; + long endt = curt + waitTime; + CompactionState state = admin.getCompactionState(tableName).get(); + LOG.info("Current compaction state 1 is " + state); + while (state == CompactionState.NONE && curt < endt) { + Thread.sleep(100); + state = admin.getCompactionState(tableName).get(); + curt = System.currentTimeMillis(); + LOG.info("Current compaction state 2 is " + state); + } + // Now, should have the right compaction state, let's wait until the compaction is done + if (state == CompactionState.MAJOR) { + state = admin.getCompactionState(tableName).get(); + LOG.info("Current compaction state 3 is " + state); + while (state != CompactionState.NONE && curt < endt) { + Thread.sleep(10); + state = admin.getCompactionState(tableName).get(); + LOG.info("Current compaction state 4 is " + state); + } + } + // Sleep to wait region server report + Thread.sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2); + + ts = admin.getLastMajorCompactionTimestamp(tableName).get(); + // after a compaction our earliest timestamp will have progressed forward + assertTrue(ts.isPresent()); + assertTrue(ts.get() > 0); + // region api still the same + ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get(); + assertTrue(ts1.isPresent()); + assertEquals(ts.get(), ts1.get()); + table.put(p).join(); + admin.flush(tableName).join(); + ts = admin.getLastMajorCompactionTimestamp(tableName).join(); + assertTrue(ts.isPresent()); + assertEquals(ts.get(), ts1.get()); + ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get(); + assertTrue(ts1.isPresent()); + assertEquals(ts.get(), ts1.get()); + } +} \ No newline at end of file