HBASE-18316 Implement async admin operations for draining region servers
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c48bb671 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c48bb671 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c48bb671 Branch: refs/heads/HBASE-14070.HLC Commit: c48bb67123e7bd622c567393097d81665dc5fff8 Parents: bc8ebc6 Author: Guanghao Zhang <zg...@apache.org> Authored: Sun Jul 9 19:51:59 2017 +0800 Committer: Guanghao Zhang <zg...@apache.org> Committed: Sun Jul 9 19:51:59 2017 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/client/AsyncAdmin.java | 19 +++- .../hadoop/hbase/client/AsyncHBaseAdmin.java | 15 +++ .../apache/hadoop/hbase/client/HBaseAdmin.java | 21 +--- .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 44 ++++++++ .../hbase/shaded/protobuf/RequestConverter.java | 23 ++++- .../hbase/client/TestAsyncDrainAdminApi.java | 101 +++++++++++++++++++ 6 files changed, 203 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c48bb671/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 8411a5b..65c9faf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import java.io.IOException; import java.util.List; import java.util.Collection; import java.util.Map; @@ -772,6 +771,24 @@ public interface AsyncAdmin { CompletableFuture<List<ProcedureInfo>> listProcedures(); /** + * Mark a region server as draining to prevent additional regions from getting assigned to it. + * @param servers + */ + CompletableFuture<Void> drainRegionServers(List<ServerName> servers); + + /** + * List region servers marked as draining to not get additional regions assigned to them. + * @return List of draining region servers wrapped by {@link CompletableFuture} + */ + CompletableFuture<List<ServerName>> listDrainingRegionServers(); + + /** + * Remove drain from a region server to allow additional regions assignments. + * @param servers List of region servers to remove drain from. + */ + CompletableFuture<Void> removeDrainFromRegionServers(List<ServerName> servers); + + /** * @return cluster status wrapped by {@link CompletableFuture} */ CompletableFuture<ClusterStatus> getClusterStatus(); http://git-wip-us.apache.org/repos/asf/hbase/blob/c48bb671/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 7c572db..311bda4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -439,6 +439,21 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override + public CompletableFuture<Void> drainRegionServers(List<ServerName> servers) { + return wrap(rawAdmin.drainRegionServers(servers)); + } + + @Override + public CompletableFuture<List<ServerName>> listDrainingRegionServers() { + return wrap(rawAdmin.listDrainingRegionServers()); + } + + @Override + public CompletableFuture<Void> removeDrainFromRegionServers(List<ServerName> servers) { + return wrap(rawAdmin.removeDrainFromRegionServers(servers)); + } + + @Override public CompletableFuture<ClusterStatus> getClusterStatus() { return wrap(rawAdmin.getClusterStatus()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c48bb671/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 7518b9c..3b099ef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -4036,19 +4036,11 @@ public class HBaseAdmin implements Admin { @Override public void drainRegionServers(List<ServerName> servers) throws IOException { - final List<HBaseProtos.ServerName> pbServers = new ArrayList<>(servers.size()); - for (ServerName server : servers) { - // Parse to ServerName to do simple validation. - ServerName.parseServerName(server.toString()); - pbServers.add(ProtobufUtil.toServerName(server)); - } - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { @Override public Void rpcCall() throws ServiceException { - DrainRegionServersRequest req = - DrainRegionServersRequest.newBuilder().addAllServerName(pbServers).build(); - master.drainRegionServers(getRpcController(), req); + master.drainRegionServers(getRpcController(), + RequestConverter.buildDrainRegionServersRequest(servers)); return null; } }); @@ -4073,17 +4065,10 @@ public class HBaseAdmin implements Admin { @Override public void removeDrainFromRegionServers(List<ServerName> servers) throws IOException { - final List<HBaseProtos.ServerName> pbServers = new ArrayList<>(servers.size()); - for (ServerName server : servers) { - pbServers.add(ProtobufUtil.toServerName(server)); - } - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { @Override public Void rpcCall() throws ServiceException { - RemoveDrainFromRegionServersRequest req = RemoveDrainFromRegionServersRequest.newBuilder() - .addAllServerName(pbServers).build(); - master.removeDrainFromRegionServers(getRpcController(), req); + master.removeDrainFromRegionServers(getRpcController(), RequestConverter.buildRemoveDrainFromRegionServersRequest(servers)); return null; } }); http://git-wip-us.apache.org/repos/asf/hbase/blob/c48bb671/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index e8c15a5..5ba8248 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -118,6 +118,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTab import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; @@ -156,6 +158,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedur import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest; @@ -176,6 +180,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeR import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; @@ -1915,6 +1921,44 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { .collect(Collectors.toList()))).call(); } + @Override + public CompletableFuture<Void> drainRegionServers(List<ServerName> servers) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this + .<DrainRegionServersRequest, DrainRegionServersResponse, Void> call(controller, stub, + RequestConverter.buildDrainRegionServersRequest(servers), + (s, c, req, done) -> s.drainRegionServers(c, req, done), resp -> null)).call(); + } + + @Override + public CompletableFuture<List<ServerName>> listDrainingRegionServers() { + return this + .<List<ServerName>> newMasterCaller() + .action( + (controller, stub) -> this + .<ListDrainingRegionServersRequest, ListDrainingRegionServersResponse, List<ServerName>> call( + controller, + stub, + ListDrainingRegionServersRequest.newBuilder().build(), + (s, c, req, done) -> s.listDrainingRegionServers(c, req, done), + resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) + .collect(Collectors.toList()))).call(); + } + + @Override + public CompletableFuture<Void> removeDrainFromRegionServers(List<ServerName> servers) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this + .<RemoveDrainFromRegionServersRequest, RemoveDrainFromRegionServersResponse, Void> call( + controller, stub, RequestConverter + .buildRemoveDrainFromRegionServersRequest(servers), (s, c, req, done) -> s + .removeDrainFromRegionServers(c, req, done), resp -> null)).call(); + } + /** * Get the region location for the passed region name. The region name may be a full region name * or encoded region name. If the region does not found, then it'll throw an http://git-wip-us.apache.org/repos/asf/hbase/blob/c48bb671/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 5c44b4e..8f726ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -25,7 +25,6 @@ import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; - import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; @@ -70,6 +69,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; @@ -93,9 +93,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColu import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; @@ -1808,4 +1810,23 @@ public final class RequestConverter { public static GetQuotaStatesRequest buildGetQuotaStatesRequest() { return GET_QUOTA_STATES_REQUEST; } + + public static DrainRegionServersRequest buildDrainRegionServersRequest(List<ServerName> servers) { + return DrainRegionServersRequest.newBuilder().addAllServerName(toProtoServerNames(servers)) + .build(); + } + + public static RemoveDrainFromRegionServersRequest buildRemoveDrainFromRegionServersRequest( + List<ServerName> servers) { + return RemoveDrainFromRegionServersRequest.newBuilder() + .addAllServerName(toProtoServerNames(servers)).build(); + } + + private static List<HBaseProtos.ServerName> toProtoServerNames(List<ServerName> servers) { + List<HBaseProtos.ServerName> pbServers = new ArrayList<>(servers.size()); + for (ServerName server : servers) { + pbServers.add(ProtobufUtil.toServerName(server)); + } + return pbServers; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c48bb671/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncDrainAdminApi.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncDrainAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncDrainAdminApi.java new file mode 100644 index 0000000..88bda10 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncDrainAdminApi.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncDrainAdminApi extends TestAsyncAdminBase { + + /* + * This test drains all regions so cannot be run in parallel with other tests. + */ + @Ignore @Test(timeout = 30000) + public void testDrainRegionServers() throws Exception { + List<ServerName> drainingServers = admin.listDrainingRegionServers().get(); + assertTrue(drainingServers.isEmpty()); + + // Drain all region servers. + Collection<ServerName> clusterServers = admin.getRegionServers().get(); + drainingServers = new ArrayList<>(); + for (ServerName server : clusterServers) { + drainingServers.add(server); + } + admin.drainRegionServers(drainingServers).join(); + + // Check that drain lists all region servers. + drainingServers = admin.listDrainingRegionServers().get(); + assertEquals(clusterServers.size(), drainingServers.size()); + for (ServerName server : clusterServers) { + assertTrue(drainingServers.contains(server)); + } + + // Try for 20 seconds to create table (new region). Will not complete because all RSs draining. + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); + builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build()); + final Runnable createTable = new Thread() { + @Override + public void run() { + try { + admin.createTable(builder.build()).join(); + } catch (Exception ioe) { + assertTrue(false); // Should not get IOException. + } + } + }; + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final java.util.concurrent.Future<?> future = executor.submit(createTable); + executor.shutdown(); + try { + future.get(20, TimeUnit.SECONDS); + } catch (TimeoutException ie) { + assertTrue(true); // Expecting timeout to happen. + } + + // Kill executor if still processing. + if (!executor.isTerminated()) { + executor.shutdownNow(); + assertTrue(true); + } + + // Remove drain list. + admin.removeDrainFromRegionServers(drainingServers); + drainingServers = admin.listDrainingRegionServers().get(); + assertTrue(drainingServers.isEmpty()); + } +}