Repository: hbase Updated Branches: refs/heads/branch-2 144795684 -> e946d9d84
HBASE-19242 Add MOB compact support for AsyncAdmin Signed-off-by: Michael Stack <st...@apache.org> Signed-off-by: Guanghao Zhang <zghao...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e946d9d8 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e946d9d8 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e946d9d8 Branch: refs/heads/branch-2 Commit: e946d9d841a72d41c0c7458667c7fedc495bd306 Parents: 1447956 Author: Balazs Meszaros <balazs.mesza...@cloudera.com> Authored: Thu Nov 23 14:42:39 2017 +0100 Committer: Michael Stack <st...@apache.org> Committed: Tue Nov 28 15:04:56 2017 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/Admin.java | 130 +++++------ .../apache/hadoop/hbase/client/AsyncAdmin.java | 69 +++++- .../hadoop/hbase/client/AsyncHBaseAdmin.java | 24 +- .../apache/hadoop/hbase/client/HBaseAdmin.java | 13 +- .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 232 ++++++++++++------- .../apache/hadoop/hbase/client/RegionInfo.java | 11 + .../hbase/client/TestAsyncRegionAdminApi.java | 38 +++ 7 files changed, 341 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e946d9d8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 6f1190e..d9f8e899 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -872,6 +872,33 @@ public interface Admin extends Abortable, Closeable { throws IOException; /** + * Compact a table. Asynchronous operation in that this method requests that a + * Compaction run and then it returns. It does not wait on the completion of Compaction + * (it can take a while). + * + * @param tableName table to compact + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + */ + void compact(TableName tableName, CompactType compactType) + throws IOException, InterruptedException; + + /** + * Compact a column family within a table. Asynchronous operation in that this method + * requests that a Compaction run and then it returns. It does not wait on the + * completion of Compaction (it can take a while). + * + * @param tableName table to compact + * @param columnFamily column family within a table + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + * @throws IOException if not a mob column family or if a remote or network exception occurs + * @throws InterruptedException + */ + void compact(TableName tableName, byte[] columnFamily, CompactType compactType) + throws IOException, InterruptedException; + + /** * Major compact a table. Asynchronous operation in that this method requests * that a Compaction run and then it returns. It does not wait on the completion of Compaction * (it can take a while). @@ -916,6 +943,33 @@ public interface Admin extends Abortable, Closeable { throws IOException; /** + * Major compact a table. Asynchronous operation in that this method requests that a + * Compaction run and then it returns. It does not wait on the completion of Compaction + * (it can take a while). + * + * @param tableName table to compact + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + */ + void majorCompact(TableName tableName, CompactType compactType) + throws IOException, InterruptedException; + + /** + * Major compact a column family within a table. Asynchronous operation in that this method requests that a + * Compaction run and then it returns. It does not wait on the completion of Compaction + * (it can take a while). + * + * @param tableName table to compact + * @param columnFamily column family within a table + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + * @throws IOException if not a mob column family or if a remote or network exception occurs + * @throws InterruptedException + */ + void majorCompact(TableName tableName, byte[] columnFamily, CompactType compactType) + throws IOException, InterruptedException; + + /** * Compact all regions on the region server. Asynchronous operation in that this method requests * that a Compaction run and then it returns. It does not wait on the completion of Compaction (it * can take a while). @@ -1736,6 +1790,17 @@ public interface Admin extends Abortable, Closeable { CompactionState getCompactionState(TableName tableName) throws IOException; /** + * Get the current compaction state of a table. It could be in a compaction, or none. + * + * @param tableName table to examine + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + * @return the current compaction state + * @throws IOException if a remote or network exception occurs + */ + CompactionState getCompactionState(TableName tableName, + CompactType compactType) throws IOException; + + /** * Get the current compaction state of region. It could be in a major compaction, a minor * compaction, both, or none. * @@ -2311,71 +2376,6 @@ public interface Admin extends Abortable, Closeable { } /** - * Compact a table. Asynchronous operation in that this method requests that a - * Compaction run and then it returns. It does not wait on the completion of Compaction - * (it can take a while). - * - * @param tableName table to compact - * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} - * @throws IOException - * @throws InterruptedException - */ - void compact(TableName tableName, CompactType compactType) - throws IOException, InterruptedException; - - /** - * Compact a column family within a table. Asynchronous operation in that this method requests that a - * Compaction run and then it returns. It does not wait on the completion of Compaction - * (it can take a while). - * - * @param tableName table to compact - * @param columnFamily column family within a table - * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} - * @throws IOException if not a mob column family or if a remote or network exception occurs - * @throws InterruptedException - */ - void compact(TableName tableName, byte[] columnFamily, CompactType compactType) - throws IOException, InterruptedException; - - /** - * Major compact a table. Asynchronous operation in that this method requests that a - * Compaction run and then it returns. It does not wait on the completion of Compaction - * (it can take a while). - * - * @param tableName table to compact - * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} - * @throws IOException - * @throws InterruptedException - */ - void majorCompact(TableName tableName, CompactType compactType) - throws IOException, InterruptedException; - - /** - * Major compact a column family within a table. Asynchronous operation in that this method requests that a - * Compaction run and then it returns. It does not wait on the completion of Compaction - * (it can take a while). - * - * @param tableName table to compact - * @param columnFamily column family within a table - * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} - * @throws IOException if not a mob column family or if a remote or network exception occurs - * @throws InterruptedException - */ - void majorCompact(TableName tableName, byte[] columnFamily, CompactType compactType) - throws IOException, InterruptedException; - - /** - * Get the current compaction state of a table. It could be in a compaction, or none. - * - * @param tableName table to examine - * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} - * @return the current compaction state - * @throws IOException if a remote or network exception occurs - */ - CompactionState getCompactionState(TableName tableName, - CompactType compactType) throws IOException; - - /** * Return the set of supported security capabilities. * @throws IOException * @throws UnsupportedOperationException http://git-wip-us.apache.org/repos/asf/hbase/blob/e946d9d8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index abd8e5a..2ae51ac 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -299,7 +299,9 @@ public interface AsyncAdmin { * was sent to HBase and may need some time to finish the compact operation. * @param tableName table to compact */ - CompletableFuture<Void> compact(TableName tableName); + default CompletableFuture<Void> compact(TableName tableName) { + return compact(tableName, CompactType.NORMAL); + } /** * Compact a column family within a table. When the returned CompletableFuture is done, it only @@ -309,7 +311,28 @@ public interface AsyncAdmin { * @param columnFamily column family within a table. If not present, compact the table's all * column families. */ - CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily); + default CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily) { + return compact(tableName, columnFamily, CompactType.NORMAL); + } + + /** + * Compact a table. When the returned CompletableFuture is done, it only means the compact request + * was sent to HBase and may need some time to finish the compact operation. + * @param tableName table to compact + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + */ + CompletableFuture<Void> compact(TableName tableName, CompactType compactType); + + /** + * Compact a column family within a table. When the returned CompletableFuture is done, it only + * means the compact request was sent to HBase and may need some time to finish the compact + * operation. + * @param tableName table to compact + * @param columnFamily column family within a table + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + */ + CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, + CompactType compactType); /** * Compact an individual region. When the returned CompletableFuture is done, it only means the @@ -333,7 +356,29 @@ public interface AsyncAdmin { * request was sent to HBase and may need some time to finish the compact operation. * @param tableName table to major compact */ - CompletableFuture<Void> majorCompact(TableName tableName); + default CompletableFuture<Void> majorCompact(TableName tableName) { + return majorCompact(tableName, CompactType.NORMAL); + } + + /** + * Major compact a column family within a table. When the returned CompletableFuture is done, it + * only means the compact request was sent to HBase and may need some time to finish the compact + * operation. + * @param tableName table to major compact + * @param columnFamily column family within a table. If not present, major compact the table's all + * column families. + */ + default CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily) { + return majorCompact(tableName, columnFamily, CompactType.NORMAL); + } + + /** + * Major compact a table. When the returned CompletableFuture is done, it only means the compact + * request was sent to HBase and may need some time to finish the compact operation. + * @param tableName table to major compact + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + */ + CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType); /** * Major compact a column family within a table. When the returned CompletableFuture is done, it @@ -342,8 +387,10 @@ public interface AsyncAdmin { * @param tableName table to major compact * @param columnFamily column family within a table. If not present, major compact the table's all * column families. + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} */ - CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily); + CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, + CompactType compactType); /** * Major compact a region. When the returned CompletableFuture is done, it only means the compact @@ -960,7 +1007,19 @@ public interface AsyncAdmin { * @param tableName table to examine * @return the current compaction state wrapped by a {@link CompletableFuture} */ - CompletableFuture<CompactionState> getCompactionState(TableName tableName); + default CompletableFuture<CompactionState> getCompactionState(TableName tableName) { + return getCompactionState(tableName, CompactType.NORMAL); + } + + /** + * Get the current compaction state of a table. It could be in a major compaction, a minor + * compaction, both, or none. + * @param tableName table to examine + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + * @return the current compaction state wrapped by a {@link CompletableFuture} + */ + CompletableFuture<CompactionState> getCompactionState(TableName tableName, + CompactType compactType); /** * Get the current compaction state of region. It could be in a major compaction, a minor http://git-wip-us.apache.org/repos/asf/hbase/blob/e946d9d8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index fb16fce..0f0679d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -244,13 +244,15 @@ class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<Void> compact(TableName tableName) { - return wrap(rawAdmin.compact(tableName)); + public CompletableFuture<Void> compact(TableName tableName, + CompactType compactType) { + return wrap(rawAdmin.compact(tableName, compactType)); } @Override - public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily) { - return wrap(rawAdmin.compact(tableName, columnFamily)); + public CompletableFuture<Void> compact(TableName tableName, + byte[] columnFamily, CompactType compactType) { + return wrap(rawAdmin.compact(tableName, columnFamily, compactType)); } @Override @@ -264,13 +266,14 @@ class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<Void> majorCompact(TableName tableName) { - return wrap(rawAdmin.majorCompact(tableName)); + public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { + return wrap(rawAdmin.majorCompact(tableName, compactType)); } @Override - public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily) { - return wrap(rawAdmin.majorCompact(tableName, columnFamily)); + public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, + CompactType compactType) { + return wrap(rawAdmin.majorCompact(tableName, columnFamily, compactType)); } @Override @@ -632,8 +635,9 @@ class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<CompactionState> getCompactionState(TableName tableName) { - return wrap(rawAdmin.getCompactionState(tableName)); + public CompletableFuture<CompactionState> getCompactionState( + TableName tableName, CompactType compactType) { + return wrap(rawAdmin.getCompactionState(tableName, compactType)); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/e946d9d8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 05157dd..1a00efe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1282,8 +1282,8 @@ public class HBaseAdmin implements Admin { CompactType compactType) throws IOException { switch (compactType) { case MOB: - compact(this.connection.getAdminForMaster(), getMobRegionInfo(tableName), major, - columnFamily); + compact(this.connection.getAdminForMaster(), RegionInfo.createMobRegionInfo(tableName), + major, columnFamily); break; case NORMAL: checkTableExists(tableName); @@ -3240,7 +3240,7 @@ public class HBaseAdmin implements Admin { new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() { @Override public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception { - RegionInfo info = getMobRegionInfo(tableName); + RegionInfo info = RegionInfo.createMobRegionInfo(tableName); GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(info.getRegionName(), true); GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request); @@ -3304,7 +3304,7 @@ public class HBaseAdmin implements Admin { } break; default: - throw new IllegalArgumentException("Unknowne compactType: " + compactType); + throw new IllegalArgumentException("Unknown compactType: " + compactType); } if (state != null) { return ProtobufUtil.createCompactionState(state); @@ -3839,11 +3839,6 @@ public class HBaseAdmin implements Admin { }); } - private RegionInfo getMobRegionInfo(TableName tableName) { - return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(".mob")).setRegionId(0) - .build(); - } - private RpcControllerFactory getRpcControllerFactory() { return this.rpcControllerFactory; } http://git-wip-us.apache.org/repos/asf/hbase/blob/e946d9d8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index f56e7ca..5e9356a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -842,15 +842,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<Void> compact(TableName tableName) { - return compact(tableName, null, false, CompactType.NORMAL); + public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { + return compact(tableName, null, false, compactType); } @Override - public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily) { - Preconditions.checkNotNull(columnFamily, - "columnFamily is null. If you don't specify a columnFamily, use compact(TableName) instead"); - return compact(tableName, columnFamily, false, CompactType.NORMAL); + public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, + CompactType compactType) { + Preconditions.checkNotNull(columnFamily, "columnFamily is null. " + + "If you don't specify a columnFamily, use compact(TableName) instead"); + return compact(tableName, columnFamily, false, compactType); } @Override @@ -866,15 +867,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<Void> majorCompact(TableName tableName) { - return compact(tableName, null, true, CompactType.NORMAL); + public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { + return compact(tableName, null, true, compactType); } @Override - public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily) { + public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, + CompactType compactType) { Preconditions.checkNotNull(columnFamily, "columnFamily is null." - + " If you don't specify a columnFamily, use majorCompact(TableName) instead"); - return compact(tableName, columnFamily, true, CompactType.NORMAL); + + "If you don't specify a columnFamily, use compact(TableName) instead"); + return compact(tableName, columnFamily, true, compactType); } @Override @@ -926,6 +928,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, boolean major) { CompletableFuture<Void> future = new CompletableFuture<>(); + getRegionLocation(regionName).whenComplete( (location, err) -> { if (err != null) { @@ -981,31 +984,51 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { /** * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() */ - private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, - CompactType compactType) { - if (CompactType.MOB.equals(compactType)) { - // TODO support MOB compact. - return failedFuture(new UnsupportedOperationException("MOB compact does not support")); - } + private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, + boolean major, CompactType compactType) { CompletableFuture<Void> future = new CompletableFuture<>(); - getTableHRegionLocations(tableName).whenComplete((locations, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - CompletableFuture<?>[] compactFutures = locations.stream().filter(l -> l.getRegion() != null) - .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) - .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) - .toArray(CompletableFuture<?>[]::new); - // future complete unless all of the compact futures are completed. - CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); - }); + + switch (compactType) { + case MOB: + connection.registry.getMasterAddress().whenComplete((serverName, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); + compact(serverName, regionInfo, major, columnFamily) + .whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + break; + case NORMAL: + getTableHRegionLocations(tableName).whenComplete((locations, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + CompletableFuture<?>[] compactFutures = locations.stream().filter(l -> l.getRegion() != null) + .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) + .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) + .toArray(CompletableFuture<?>[]::new); + // future complete unless all of the compact futures are completed. + CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + break; + default: + throw new IllegalArgumentException("Unknown compactType: " + compactType); + } return future; } @@ -2741,64 +2764,99 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<CompactionState> getCompactionState(TableName tableName) { + public CompletableFuture<CompactionState> getCompactionState(TableName tableName, + CompactType compactType) { CompletableFuture<CompactionState> future = new CompletableFuture<>(); - getTableHRegionLocations(tableName).whenComplete( - (locations, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - List<CompactionState> regionStates = new ArrayList<>(); - List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); - locations.stream().filter(loc -> loc.getServerName() != null) - .filter(loc -> loc.getRegion() != null) - .filter(loc -> !loc.getRegion().isOffline()) - .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { - futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { - // If any region compaction state is MAJOR_AND_MINOR - // the table compaction state is MAJOR_AND_MINOR, too. - if (err2 != null) { - future.completeExceptionally(err2); - } else if (regionState == CompactionState.MAJOR_AND_MINOR) { - future.complete(regionState); - } else { - regionStates.add(regionState); - } - })); - }); - CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])) - .whenComplete((ret, err3) -> { - // If future not completed, check all regions's compaction state - if (!future.isCompletedExceptionally() && !future.isDone()) { - CompactionState state = CompactionState.NONE; - for (CompactionState regionState : regionStates) { - switch (regionState) { - case MAJOR: - if (state == CompactionState.MINOR) { - future.complete(CompactionState.MAJOR_AND_MINOR); + switch (compactType) { + case MOB: + connection.registry.getMasterAddress().whenComplete((serverName, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); + + this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName).action( + (controller, stub) -> this + .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall( + controller, stub, + RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), + (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp) + ).call().whenComplete((resp2, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + if (resp2.hasCompactionState()) { + future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); + } else { + future.complete(CompactionState.NONE); + } + } + }); + }); + break; + case NORMAL: + getTableHRegionLocations(tableName).whenComplete( + (locations, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + List<CompactionState> regionStates = new ArrayList<>(); + List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); + locations.stream().filter(loc -> loc.getServerName() != null) + .filter(loc -> loc.getRegion() != null) + .filter(loc -> !loc.getRegion().isOffline()) + .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { + futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { + // If any region compaction state is MAJOR_AND_MINOR + // the table compaction state is MAJOR_AND_MINOR, too. + if (err2 != null) { + future.completeExceptionally(err2); + } else if (regionState == CompactionState.MAJOR_AND_MINOR) { + future.complete(regionState); } else { - state = CompactionState.MAJOR; + regionStates.add(regionState); } - break; - case MINOR: - if (state == CompactionState.MAJOR) { - future.complete(CompactionState.MAJOR_AND_MINOR); - } else { - state = CompactionState.MINOR; + })); + }); + CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])) + .whenComplete((ret, err3) -> { + // If future not completed, check all regions's compaction state + if (!future.isCompletedExceptionally() && !future.isDone()) { + CompactionState state = CompactionState.NONE; + for (CompactionState regionState : regionStates) { + switch (regionState) { + case MAJOR: + if (state == CompactionState.MINOR) { + future.complete(CompactionState.MAJOR_AND_MINOR); + } else { + state = CompactionState.MAJOR; + } + break; + case MINOR: + if (state == CompactionState.MAJOR) { + future.complete(CompactionState.MAJOR_AND_MINOR); + } else { + state = CompactionState.MINOR; + } + break; + case NONE: + default: + } + if (!future.isDone()) { + future.complete(state); + } } - break; - case NONE: - default: } - if (!future.isDone()) { - future.complete(state); - } - } - } - }); - }); + }); + }); + break; + default: + throw new IllegalArgumentException("Unknown compactType: " + compactType); + } + return future; } http://git-wip-us.apache.org/repos/asf/hbase/blob/e946d9d8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java index 0eb4e42..cfca6da 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java @@ -573,6 +573,17 @@ public interface RegionInfo { } /** + * Creates a RegionInfo object for MOB data. + * + * @param tableName the name of the table + * @return the MOB {@link RegionInfo}. + */ + static RegionInfo createMobRegionInfo(TableName tableName) { + return RegionInfoBuilder.newBuilder(tableName) + .setStartKey(Bytes.toBytes(".mob")).setRegionId(0).build(); + } + + /** * Separate elements of a regionName. * @param regionName * @return Array of byte[] containing tableName, startKey and id http://git-wip-us.apache.org/repos/asf/hbase/blob/e946d9d8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java index 8a1afab..e6cffd6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; @@ -422,6 +424,42 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { assertEquals(count, 2); } + private void waitUntilMobCompactionFinished(TableName tableName) + throws ExecutionException, InterruptedException { + long finished = EnvironmentEdgeManager.currentTime() + 60000; + CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get(); + while (EnvironmentEdgeManager.currentTime() < finished) { + if (state == CompactionState.NONE) { + break; + } + Thread.sleep(10); + state = admin.getCompactionState(tableName, CompactType.MOB).get(); + } + assertEquals(CompactionState.NONE, state); + } + + @Test + public void testCompactMob() throws Exception { + ColumnFamilyDescriptor columnDescriptor = + ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob")) + .setMobEnabled(true).setMobThreshold(0).build(); + + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) + .addColumnFamily(columnDescriptor).build(); + + admin.createTable(tableDescriptor).get(); + + byte[][] families = { Bytes.toBytes("mob") }; + loadData(tableName, families, 3000, 8); + + admin.majorCompact(tableName, CompactType.MOB).get(); + + CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get(); + assertNotEquals(CompactionState.NONE, state); + + waitUntilMobCompactionFinished(tableName); + } + @Test public void testCompactRegionServer() throws Exception { byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };